From 53d8bf6b1c27299d2020133699bafb3b83f7e094 Mon Sep 17 00:00:00 2001 From: "takahiro.nakayama" Date: Mon, 6 Jul 2015 23:35:52 +0900 Subject: [PATCH 01/10] BUILD SUCCESSFUL --- build.gradle | 1 + .../org/embulk/output/HdfsOutputPlugin.java | 190 ++++++++++++++---- 2 files changed, 156 insertions(+), 35 deletions(-) diff --git a/build.gradle b/build.gradle index 30dcb78..41b75ef 100644 --- a/build.gradle +++ b/build.gradle @@ -19,6 +19,7 @@ dependencies { provided "org.embulk:embulk-core:0.6.16" // compile "YOUR_JAR_DEPENDENCY_GROUP:YOUR_JAR_DEPENDENCY_MODULE:YOUR_JAR_DEPENDENCY_VERSION" compile 'org.apache.hadoop:hadoop-client:2.6.0' + compile 'com.google.guava:guava:14.0' testCompile "junit:junit:4.+" } diff --git a/src/main/java/org/embulk/output/HdfsOutputPlugin.java b/src/main/java/org/embulk/output/HdfsOutputPlugin.java index 39b4ff7..aa072ec 100644 --- a/src/main/java/org/embulk/output/HdfsOutputPlugin.java +++ b/src/main/java/org/embulk/output/HdfsOutputPlugin.java @@ -1,27 +1,20 @@ package org.embulk.output; -import org.embulk.config.CommitReport; -import org.embulk.config.Config; -import org.embulk.config.ConfigDefault; -import org.embulk.config.ConfigDiff; -import org.embulk.config.ConfigSource; -import org.embulk.config.Task; -import org.embulk.config.TaskSource; -import org.embulk.spi.Exec; -import org.embulk.spi.OutputPlugin; -import org.embulk.spi.PageOutput; -import org.embulk.spi.Schema; -import org.embulk.spi.TransactionalPageOutput; +import com.google.common.base.Throwables; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.Progressable; +import org.embulk.config.*; +import org.embulk.spi.*; +import org.jruby.embed.ScriptingContainer; +import org.slf4j.Logger; + +import java.io.IOException; +import java.io.OutputStream; import java.util.List; import java.util.Map; -public class HdfsOutputPlugin implements OutputPlugin +public class HdfsOutputPlugin implements FileOutputPlugin { private static final Logger logger = Exec.getLogger(HdfsOutputPlugin.class); @@ -35,52 +28,179 @@ public interface PluginTask extends Task @ConfigDefault("{}") public Map getConfig(); + @Config("sequence_format") + @ConfigDefault("\".%03d.%02d\"") + public String getSequenceFormat(); + @Config("output_path") - @ConfigDefault("/tmp/") + @ConfigDefault("/tmp/embulk.working.hdfs_output.%Y%m%d_%s") public String getOutputPath(); @Config("working_path") - @ConfigDefault("/tmp/embulk.hdfs_output.%Y%m%d_%s") + @ConfigDefault("/tmp/embulk.working.hdfs_output.%Y%m%d_%s") public String getWorkingPath(); + } @Override public ConfigDiff transaction(ConfigSource config, - Schema schema, int taskCount, - OutputPlugin.Control control) + FileOutputPlugin.Control control) { PluginTask task = config.loadConfig(PluginTask.class); - - // retryable (idempotent) output: - // return resume(task.dump(), schema, taskCount, control); - - // non-retryable (non-idempotent) output: - control.run(task.dump()); - return Exec.newConfigDiff(); + return resume(task.dump(), taskCount, control); } @Override public ConfigDiff resume(TaskSource taskSource, - Schema schema, int taskCount, - OutputPlugin.Control control) + int taskCount, + FileOutputPlugin.Control control) { - throw new UnsupportedOperationException("hdfs output plugin does not support resuming"); + control.run(taskSource); + return Exec.newConfigDiff(); } + @Override public void cleanup(TaskSource taskSource, - Schema schema, int taskCount, - List successCommitReports) + int taskCount, + List successCommitReports) { } @Override - public TransactionalPageOutput open(TaskSource taskSource, Schema schema, int taskIndex) + public TransactionalFileOutput open(TaskSource taskSource, final int taskIndex) { PluginTask task = taskSource.loadTask(PluginTask.class); - // TODO - throw new UnsupportedOperationException("The 'open' method needs to be implemented"); + Configuration configuration = getHdfsConfiguration(task); + FileSystem fs = getFs(configuration); + String workingPath = strftime(task.getWorkingPath()); + return new TransactionalHdfsFileOutput(task, fs, workingPath, taskIndex); + } + + private Configuration getHdfsConfiguration(final PluginTask task) + { + Configuration configuration = new Configuration(); + + List configFiles = task.getConfigFiles(); + for (Object configFile : configFiles) { + configuration.addResource(configFile.toString()); + } + + for (Map.Entry entry: task.getConfig().entrySet()) { + configuration.set(entry.getKey(), entry.getValue()); + } + + return configuration; + } + + private FileSystem getFs(final Configuration configuration) { + try { + FileSystem fs = FileSystem.get(configuration); + return fs; + } + catch (IOException e) { + logger.error(e.getMessage()); + throw Throwables.propagate(e); + } + } + + private String strftime(final String path) + { + // strftime + ScriptingContainer jruby = new ScriptingContainer(); + Object result = jruby.runScriptlet("Time.now.strftime('" + path + "')"); + return result.toString(); + } + + static class TransactionalHdfsFileOutput implements TransactionalFileOutput + { + private final int taskIndex; + private final FileSystem fs; + private final String workingPath; + private final String sequenceFormat; + + private int fileIndex = 0; + private int callCount = 0; + private Path currentPath = null; + private OutputStream currentStream = null; + + public TransactionalHdfsFileOutput(PluginTask task, FileSystem fs, String workingPath, int taskIndex) + { + this.taskIndex = taskIndex; + this.fs = fs; + this.workingPath = workingPath; + this.sequenceFormat = task.getSequenceFormat(); + } + + public void nextFile() { + closeCurrentStream(); + currentPath = new Path(workingPath + String.format(sequenceFormat, taskIndex, fileIndex)); + try { + if (fs.exists(currentPath)) { + // TODO: appropriate exception + throw Throwables.propagate(new IOException(currentPath.toString() + "already exists.")); + } + currentStream = fs.create(currentPath); + logger.info("Uploading '{}'", currentPath.toString()); + } + catch (IOException e) { + logger.error(e.getMessage()); + throw Throwables.propagate(e); + } + fileIndex++; + } + + @Override + public void add(Buffer buffer) { + if (currentStream == null) { + throw new IllegalStateException("nextFile() must be called before poll()"); + } + try { + logger.debug("#add called {} times for taskIndex {}", callCount, taskIndex); + currentStream.write(buffer.array(), buffer.offset(), buffer.limit()); + callCount++; + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + buffer.release(); + } + } + + @Override + public void finish() { + closeCurrentStream(); + } + + @Override + public void close() { + closeCurrentStream(); + } + + @Override + public void abort() { + } + + @Override + public CommitReport commit() { + CommitReport report = Exec.newCommitReport(); + report.set("files", currentPath); + return report; + } + + private void closeCurrentStream() { + try { + if (currentStream != null) { + currentStream.close(); + currentStream = null; + } + + callCount = 0; + } catch (IOException e) { + logger.error(e.getMessage()); + throw Throwables.propagate(e); + } + } } } From b2194e2f4128ed6baf5af0eb75a9f94acd7f42d5 Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 7 Jul 2015 00:13:39 +0900 Subject: [PATCH 02/10] not working: data is written to local fs... --- src/main/java/org/embulk/output/HdfsOutputPlugin.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/embulk/output/HdfsOutputPlugin.java b/src/main/java/org/embulk/output/HdfsOutputPlugin.java index aa072ec..b742bda 100644 --- a/src/main/java/org/embulk/output/HdfsOutputPlugin.java +++ b/src/main/java/org/embulk/output/HdfsOutputPlugin.java @@ -33,11 +33,11 @@ public interface PluginTask extends Task public String getSequenceFormat(); @Config("output_path") - @ConfigDefault("/tmp/embulk.working.hdfs_output.%Y%m%d_%s") + @ConfigDefault("\"/tmp/embulk.working.hdfs_output.%Y%m%d_%s\"") public String getOutputPath(); @Config("working_path") - @ConfigDefault("/tmp/embulk.working.hdfs_output.%Y%m%d_%s") + @ConfigDefault("\"/tmp/embulk.working.hdfs_output.%Y%m%d_%s\"") public String getWorkingPath(); } From eedba92ef705f9edef87606a4735e40db7b1c4d3 Mon Sep 17 00:00:00 2001 From: "takahiro.nakayama" Date: Tue, 7 Jul 2015 08:33:57 +0900 Subject: [PATCH 03/10] modify error class and message. --- .../java/org/embulk/output/HdfsOutputPlugin.java | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/embulk/output/HdfsOutputPlugin.java b/src/main/java/org/embulk/output/HdfsOutputPlugin.java index b742bda..9da8bf3 100644 --- a/src/main/java/org/embulk/output/HdfsOutputPlugin.java +++ b/src/main/java/org/embulk/output/HdfsOutputPlugin.java @@ -4,6 +4,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.Progressable; import org.embulk.config.*; import org.embulk.spi.*; import org.jruby.embed.ScriptingContainer; @@ -139,13 +140,19 @@ public void nextFile() { currentPath = new Path(workingPath + String.format(sequenceFormat, taskIndex, fileIndex)); try { if (fs.exists(currentPath)) { - // TODO: appropriate exception - throw Throwables.propagate(new IOException(currentPath.toString() + "already exists.")); + throw new IllegalAccessException(currentPath.toString() + "already exists."); } - currentStream = fs.create(currentPath); + currentStream = fs.create( + currentPath, + new Progressable() { + @Override + public void progress() { + logger.info("{} byte written.", 1); + } + }); logger.info("Uploading '{}'", currentPath.toString()); } - catch (IOException e) { + catch (IOException | IllegalAccessException e) { logger.error(e.getMessage()); throw Throwables.propagate(e); } From 58f4ade44b80d0035d32240a870efe721a3696a7 Mon Sep 17 00:00:00 2001 From: "takahiro.nakayama" Date: Tue, 7 Jul 2015 08:38:34 +0900 Subject: [PATCH 04/10] use more deep directory when working for easy way to remove and move files. --- src/main/java/org/embulk/output/HdfsOutputPlugin.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/embulk/output/HdfsOutputPlugin.java b/src/main/java/org/embulk/output/HdfsOutputPlugin.java index 9da8bf3..9c0f2e6 100644 --- a/src/main/java/org/embulk/output/HdfsOutputPlugin.java +++ b/src/main/java/org/embulk/output/HdfsOutputPlugin.java @@ -137,7 +137,7 @@ public TransactionalHdfsFileOutput(PluginTask task, FileSystem fs, String workin public void nextFile() { closeCurrentStream(); - currentPath = new Path(workingPath + String.format(sequenceFormat, taskIndex, fileIndex)); + currentPath = new Path(workingPath + '/' + String.format(sequenceFormat, taskIndex, fileIndex)); try { if (fs.exists(currentPath)) { throw new IllegalAccessException(currentPath.toString() + "already exists."); From 2daa6928c2f29e49ef74e9c8c3c7e9a934137872 Mon Sep 17 00:00:00 2001 From: "takahiro.nakayama" Date: Tue, 7 Jul 2015 08:40:55 +0900 Subject: [PATCH 05/10] remove progresible --- .../org/embulk/output/HdfsOutputPlugin.java | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/src/main/java/org/embulk/output/HdfsOutputPlugin.java b/src/main/java/org/embulk/output/HdfsOutputPlugin.java index 9c0f2e6..d2bf2c9 100644 --- a/src/main/java/org/embulk/output/HdfsOutputPlugin.java +++ b/src/main/java/org/embulk/output/HdfsOutputPlugin.java @@ -4,9 +4,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.util.Progressable; import org.embulk.config.*; -import org.embulk.spi.*; +import org.embulk.spi.Buffer; +import org.embulk.spi.Exec; +import org.embulk.spi.FileOutputPlugin; +import org.embulk.spi.TransactionalFileOutput; import org.jruby.embed.ScriptingContainer; import org.slf4j.Logger; @@ -30,7 +32,7 @@ public interface PluginTask extends Task public Map getConfig(); @Config("sequence_format") - @ConfigDefault("\".%03d.%02d\"") + @ConfigDefault("\"%03d.%02d\"") public String getSequenceFormat(); @Config("output_path") @@ -142,14 +144,7 @@ public void nextFile() { if (fs.exists(currentPath)) { throw new IllegalAccessException(currentPath.toString() + "already exists."); } - currentStream = fs.create( - currentPath, - new Progressable() { - @Override - public void progress() { - logger.info("{} byte written.", 1); - } - }); + currentStream = fs.create(currentPath); logger.info("Uploading '{}'", currentPath.toString()); } catch (IOException | IllegalAccessException e) { From 88436bd71c2969149ca8f41c29c184c904290351 Mon Sep 17 00:00:00 2001 From: "takahiro.nakayama" Date: Tue, 7 Jul 2015 08:49:40 +0900 Subject: [PATCH 06/10] add procedure to finalize --- .../java/org/embulk/output/HdfsOutputPlugin.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/embulk/output/HdfsOutputPlugin.java b/src/main/java/org/embulk/output/HdfsOutputPlugin.java index d2bf2c9..86d665e 100644 --- a/src/main/java/org/embulk/output/HdfsOutputPlugin.java +++ b/src/main/java/org/embulk/output/HdfsOutputPlugin.java @@ -79,7 +79,8 @@ public TransactionalFileOutput open(TaskSource taskSource, final int taskIndex) Configuration configuration = getHdfsConfiguration(task); FileSystem fs = getFs(configuration); String workingPath = strftime(task.getWorkingPath()); - return new TransactionalHdfsFileOutput(task, fs, workingPath, taskIndex); + String outputPath = strftime(task.getOutputPath()); + return new TransactionalHdfsFileOutput(task, fs, workingPath, outputPath, taskIndex); } private Configuration getHdfsConfiguration(final PluginTask task) @@ -122,6 +123,7 @@ static class TransactionalHdfsFileOutput implements TransactionalFileOutput private final int taskIndex; private final FileSystem fs; private final String workingPath; + private final String outputPath; private final String sequenceFormat; private int fileIndex = 0; @@ -129,11 +131,12 @@ static class TransactionalHdfsFileOutput implements TransactionalFileOutput private Path currentPath = null; private OutputStream currentStream = null; - public TransactionalHdfsFileOutput(PluginTask task, FileSystem fs, String workingPath, int taskIndex) + public TransactionalHdfsFileOutput(PluginTask task, FileSystem fs, String workingPath, String outputPath, int taskIndex) { this.taskIndex = taskIndex; this.fs = fs; this.workingPath = workingPath; + this.outputPath = outputPath; this.sequenceFormat = task.getSequenceFormat(); } @@ -172,6 +175,12 @@ public void add(Buffer buffer) { @Override public void finish() { + try { + fs.rename(new Path(workingPath), new Path(outputPath)); + } catch (IOException e) { + logger.error(e.getMessage()); + throw Throwables.propagate(e); + } closeCurrentStream(); } From a8751b66749d422e08fffcc18605fab2deae775b Mon Sep 17 00:00:00 2001 From: "takahiro.nakayama" Date: Tue, 7 Jul 2015 08:56:58 +0900 Subject: [PATCH 07/10] No lease --- src/main/java/org/embulk/output/HdfsOutputPlugin.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/embulk/output/HdfsOutputPlugin.java b/src/main/java/org/embulk/output/HdfsOutputPlugin.java index 86d665e..2676438 100644 --- a/src/main/java/org/embulk/output/HdfsOutputPlugin.java +++ b/src/main/java/org/embulk/output/HdfsOutputPlugin.java @@ -36,7 +36,7 @@ public interface PluginTask extends Task public String getSequenceFormat(); @Config("output_path") - @ConfigDefault("\"/tmp/embulk.working.hdfs_output.%Y%m%d_%s\"") + @ConfigDefault("\"/tmp/embulk.output.hdfs_output.%Y%m%d_%s\"") public String getOutputPath(); @Config("working_path") @@ -176,7 +176,9 @@ public void add(Buffer buffer) { @Override public void finish() { try { - fs.rename(new Path(workingPath), new Path(outputPath)); + Path outputHdfsPath = new Path(outputPath); + fs.mkdirs(outputHdfsPath); + fs.rename(new Path(workingPath), outputHdfsPath); } catch (IOException e) { logger.error(e.getMessage()); throw Throwables.propagate(e); From e57d271f15fedba2a9f1d851b0e0164a4b0ba22f Mon Sep 17 00:00:00 2001 From: "takahiro.nakayama" Date: Tue, 7 Jul 2015 09:02:11 +0900 Subject: [PATCH 08/10] experimental --- .../java/org/embulk/output/HdfsOutputPlugin.java | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/src/main/java/org/embulk/output/HdfsOutputPlugin.java b/src/main/java/org/embulk/output/HdfsOutputPlugin.java index 2676438..190e370 100644 --- a/src/main/java/org/embulk/output/HdfsOutputPlugin.java +++ b/src/main/java/org/embulk/output/HdfsOutputPlugin.java @@ -175,14 +175,6 @@ public void add(Buffer buffer) { @Override public void finish() { - try { - Path outputHdfsPath = new Path(outputPath); - fs.mkdirs(outputHdfsPath); - fs.rename(new Path(workingPath), outputHdfsPath); - } catch (IOException e) { - logger.error(e.getMessage()); - throw Throwables.propagate(e); - } closeCurrentStream(); } @@ -197,6 +189,13 @@ public void abort() { @Override public CommitReport commit() { + try { + fs.rename(new Path(workingPath), new Path(outputPath)); + } catch (IOException e) { + logger.error(e.getMessage()); + throw Throwables.propagate(e); + } + CommitReport report = Exec.newCommitReport(); report.set("files", currentPath); return report; From 96e7cd45ccd4613303019000b486e274d397dabd Mon Sep 17 00:00:00 2001 From: "takahiro.nakayama" Date: Tue, 7 Jul 2015 09:04:24 +0900 Subject: [PATCH 09/10] add logging property --- src/main/java/org/embulk/output/HdfsOutputPlugin.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/org/embulk/output/HdfsOutputPlugin.java b/src/main/java/org/embulk/output/HdfsOutputPlugin.java index 190e370..4cc0058 100644 --- a/src/main/java/org/embulk/output/HdfsOutputPlugin.java +++ b/src/main/java/org/embulk/output/HdfsOutputPlugin.java @@ -191,6 +191,7 @@ public void abort() { public CommitReport commit() { try { fs.rename(new Path(workingPath), new Path(outputPath)); + logger.info("rename {} => {}", workingPath, outputPath); } catch (IOException e) { logger.error(e.getMessage()); throw Throwables.propagate(e); From 9a6e5736ad5e7ebc215a21d9b7609688837df1c3 Mon Sep 17 00:00:00 2001 From: "takahiro.nakayama" Date: Tue, 7 Jul 2015 16:04:37 +0900 Subject: [PATCH 10/10] modify a bit --- build.gradle | 2 +- embulk-output-hdfs.iml | 120 ----------------------------------------- 2 files changed, 1 insertion(+), 121 deletions(-) delete mode 100644 embulk-output-hdfs.iml diff --git a/build.gradle b/build.gradle index 41b75ef..1f50e48 100644 --- a/build.gradle +++ b/build.gradle @@ -59,7 +59,7 @@ Gem::Specification.new do |spec| spec.description = %[Dumps records to Hdfs.] spec.email = ["civitaspo@gmail.com"] spec.licenses = ["MIT"] - # TODO set this: spec.homepage = "https://github.com/civitaspo/embulk-output-hdfs" + spec.homepage = "https://github.com/civitaspo/embulk-output-hdfs" spec.files = `git ls-files`.split("\n") + Dir["classpath/*.jar"] spec.test_files = spec.files.grep(%r"^(test|spec)/") diff --git a/embulk-output-hdfs.iml b/embulk-output-hdfs.iml deleted file mode 100644 index 5f2b3ec..0000000 --- a/embulk-output-hdfs.iml +++ /dev/null @@ -1,120 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - \ No newline at end of file