diff --git a/build.gradle b/build.gradle
index 30dcb78..1f50e48 100644
--- a/build.gradle
+++ b/build.gradle
@@ -19,6 +19,7 @@ dependencies {
provided "org.embulk:embulk-core:0.6.16"
// compile "YOUR_JAR_DEPENDENCY_GROUP:YOUR_JAR_DEPENDENCY_MODULE:YOUR_JAR_DEPENDENCY_VERSION"
compile 'org.apache.hadoop:hadoop-client:2.6.0'
+ compile 'com.google.guava:guava:14.0'
testCompile "junit:junit:4.+"
}
@@ -58,7 +59,7 @@ Gem::Specification.new do |spec|
spec.description = %[Dumps records to Hdfs.]
spec.email = ["civitaspo@gmail.com"]
spec.licenses = ["MIT"]
- # TODO set this: spec.homepage = "https://github.com/civitaspo/embulk-output-hdfs"
+ spec.homepage = "https://github.com/civitaspo/embulk-output-hdfs"
spec.files = `git ls-files`.split("\n") + Dir["classpath/*.jar"]
spec.test_files = spec.files.grep(%r"^(test|spec)/")
diff --git a/embulk-output-hdfs.iml b/embulk-output-hdfs.iml
deleted file mode 100644
index 5f2b3ec..0000000
--- a/embulk-output-hdfs.iml
+++ /dev/null
@@ -1,120 +0,0 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/src/main/java/org/embulk/output/HdfsOutputPlugin.java b/src/main/java/org/embulk/output/HdfsOutputPlugin.java
index 39b4ff7..4cc0058 100644
--- a/src/main/java/org/embulk/output/HdfsOutputPlugin.java
+++ b/src/main/java/org/embulk/output/HdfsOutputPlugin.java
@@ -1,27 +1,23 @@
package org.embulk.output;
-import org.embulk.config.CommitReport;
-import org.embulk.config.Config;
-import org.embulk.config.ConfigDefault;
-import org.embulk.config.ConfigDiff;
-import org.embulk.config.ConfigSource;
-import org.embulk.config.Task;
-import org.embulk.config.TaskSource;
-import org.embulk.spi.Exec;
-import org.embulk.spi.OutputPlugin;
-import org.embulk.spi.PageOutput;
-import org.embulk.spi.Schema;
-import org.embulk.spi.TransactionalPageOutput;
+import com.google.common.base.Throwables;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.Progressable;
+import org.embulk.config.*;
+import org.embulk.spi.Buffer;
+import org.embulk.spi.Exec;
+import org.embulk.spi.FileOutputPlugin;
+import org.embulk.spi.TransactionalFileOutput;
+import org.jruby.embed.ScriptingContainer;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.io.OutputStream;
import java.util.List;
import java.util.Map;
-public class HdfsOutputPlugin implements OutputPlugin
+public class HdfsOutputPlugin implements FileOutputPlugin
{
private static final Logger logger = Exec.getLogger(HdfsOutputPlugin.class);
@@ -35,52 +31,189 @@ public interface PluginTask extends Task
@ConfigDefault("{}")
public Map getConfig();
+ @Config("sequence_format")
+ @ConfigDefault("\"%03d.%02d\"")
+ public String getSequenceFormat();
+
@Config("output_path")
- @ConfigDefault("/tmp/")
+ @ConfigDefault("\"/tmp/embulk.output.hdfs_output.%Y%m%d_%s\"")
public String getOutputPath();
@Config("working_path")
- @ConfigDefault("/tmp/embulk.hdfs_output.%Y%m%d_%s")
+ @ConfigDefault("\"/tmp/embulk.working.hdfs_output.%Y%m%d_%s\"")
public String getWorkingPath();
+
}
@Override
public ConfigDiff transaction(ConfigSource config,
- Schema schema,
int taskCount,
- OutputPlugin.Control control)
+ FileOutputPlugin.Control control)
{
PluginTask task = config.loadConfig(PluginTask.class);
-
- // retryable (idempotent) output:
- // return resume(task.dump(), schema, taskCount, control);
-
- // non-retryable (non-idempotent) output:
- control.run(task.dump());
- return Exec.newConfigDiff();
+ return resume(task.dump(), taskCount, control);
}
@Override
public ConfigDiff resume(TaskSource taskSource,
- Schema schema, int taskCount,
- OutputPlugin.Control control)
+ int taskCount,
+ FileOutputPlugin.Control control)
{
- throw new UnsupportedOperationException("hdfs output plugin does not support resuming");
+ control.run(taskSource);
+ return Exec.newConfigDiff();
}
+
@Override
public void cleanup(TaskSource taskSource,
- Schema schema, int taskCount,
- List successCommitReports)
+ int taskCount,
+ List successCommitReports)
{
}
@Override
- public TransactionalPageOutput open(TaskSource taskSource, Schema schema, int taskIndex)
+ public TransactionalFileOutput open(TaskSource taskSource, final int taskIndex)
{
PluginTask task = taskSource.loadTask(PluginTask.class);
- // TODO
- throw new UnsupportedOperationException("The 'open' method needs to be implemented");
+ Configuration configuration = getHdfsConfiguration(task);
+ FileSystem fs = getFs(configuration);
+ String workingPath = strftime(task.getWorkingPath());
+ String outputPath = strftime(task.getOutputPath());
+ return new TransactionalHdfsFileOutput(task, fs, workingPath, outputPath, taskIndex);
+ }
+
+ private Configuration getHdfsConfiguration(final PluginTask task)
+ {
+ Configuration configuration = new Configuration();
+
+ List configFiles = task.getConfigFiles();
+ for (Object configFile : configFiles) {
+ configuration.addResource(configFile.toString());
+ }
+
+ for (Map.Entry entry: task.getConfig().entrySet()) {
+ configuration.set(entry.getKey(), entry.getValue());
+ }
+
+ return configuration;
+ }
+
+ private FileSystem getFs(final Configuration configuration) {
+ try {
+ FileSystem fs = FileSystem.get(configuration);
+ return fs;
+ }
+ catch (IOException e) {
+ logger.error(e.getMessage());
+ throw Throwables.propagate(e);
+ }
+ }
+
+ private String strftime(final String path)
+ {
+ // strftime
+ ScriptingContainer jruby = new ScriptingContainer();
+ Object result = jruby.runScriptlet("Time.now.strftime('" + path + "')");
+ return result.toString();
+ }
+
+ static class TransactionalHdfsFileOutput implements TransactionalFileOutput
+ {
+ private final int taskIndex;
+ private final FileSystem fs;
+ private final String workingPath;
+ private final String outputPath;
+ private final String sequenceFormat;
+
+ private int fileIndex = 0;
+ private int callCount = 0;
+ private Path currentPath = null;
+ private OutputStream currentStream = null;
+
+ public TransactionalHdfsFileOutput(PluginTask task, FileSystem fs, String workingPath, String outputPath, int taskIndex)
+ {
+ this.taskIndex = taskIndex;
+ this.fs = fs;
+ this.workingPath = workingPath;
+ this.outputPath = outputPath;
+ this.sequenceFormat = task.getSequenceFormat();
+ }
+
+ public void nextFile() {
+ closeCurrentStream();
+ currentPath = new Path(workingPath + '/' + String.format(sequenceFormat, taskIndex, fileIndex));
+ try {
+ if (fs.exists(currentPath)) {
+ throw new IllegalAccessException(currentPath.toString() + "already exists.");
+ }
+ currentStream = fs.create(currentPath);
+ logger.info("Uploading '{}'", currentPath.toString());
+ }
+ catch (IOException | IllegalAccessException e) {
+ logger.error(e.getMessage());
+ throw Throwables.propagate(e);
+ }
+ fileIndex++;
+ }
+
+ @Override
+ public void add(Buffer buffer) {
+ if (currentStream == null) {
+ throw new IllegalStateException("nextFile() must be called before poll()");
+ }
+ try {
+ logger.debug("#add called {} times for taskIndex {}", callCount, taskIndex);
+ currentStream.write(buffer.array(), buffer.offset(), buffer.limit());
+ callCount++;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ buffer.release();
+ }
+ }
+
+ @Override
+ public void finish() {
+ closeCurrentStream();
+ }
+
+ @Override
+ public void close() {
+ closeCurrentStream();
+ }
+
+ @Override
+ public void abort() {
+ }
+
+ @Override
+ public CommitReport commit() {
+ try {
+ fs.rename(new Path(workingPath), new Path(outputPath));
+ logger.info("rename {} => {}", workingPath, outputPath);
+ } catch (IOException e) {
+ logger.error(e.getMessage());
+ throw Throwables.propagate(e);
+ }
+
+ CommitReport report = Exec.newCommitReport();
+ report.set("files", currentPath);
+ return report;
+ }
+
+ private void closeCurrentStream() {
+ try {
+ if (currentStream != null) {
+ currentStream.close();
+ currentStream = null;
+ }
+
+ callCount = 0;
+ } catch (IOException e) {
+ logger.error(e.getMessage());
+ throw Throwables.propagate(e);
+ }
+ }
}
}