From c73175b5e8998c352b8c3198862c5a1322d43861 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Wed, 19 Aug 2015 17:54:38 +0900 Subject: [PATCH 01/13] v0.1.0 init --- build.gradle | 2 +- lib/embulk/input/hdfs.rb | 2 +- .../org/embulk/input/HdfsFileInputPlugin.java | 231 ------------------ .../input/hdfs/HdfsFileInputPlugin.java | 143 +++++++++++ .../{ => hdfs}/TestHdfsFileInputPlugin.java | 2 +- 5 files changed, 146 insertions(+), 234 deletions(-) delete mode 100644 src/main/java/org/embulk/input/HdfsFileInputPlugin.java create mode 100644 src/main/java/org/embulk/input/hdfs/HdfsFileInputPlugin.java rename src/test/java/org/embulk/input/{ => hdfs}/TestHdfsFileInputPlugin.java (57%) diff --git a/build.gradle b/build.gradle index a3e52a1..279d6ef 100644 --- a/build.gradle +++ b/build.gradle @@ -12,7 +12,7 @@ configurations { provided } -version = "0.0.3" +version = "0.1.0" sourceCompatibility = 1.7 targetCompatibility = 1.7 diff --git a/lib/embulk/input/hdfs.rb b/lib/embulk/input/hdfs.rb index 237ebfd..e658a75 100644 --- a/lib/embulk/input/hdfs.rb +++ b/lib/embulk/input/hdfs.rb @@ -1,3 +1,3 @@ Embulk::JavaPlugin.register_input( - "hdfs", "org.embulk.input.HdfsFileInputPlugin", + "hdfs", "org.embulk.input.hdfs.HdfsFileInputPlugin", File.expand_path('../../../../classpath', __FILE__)) diff --git a/src/main/java/org/embulk/input/HdfsFileInputPlugin.java b/src/main/java/org/embulk/input/HdfsFileInputPlugin.java deleted file mode 100644 index 0dfff22..0000000 --- a/src/main/java/org/embulk/input/HdfsFileInputPlugin.java +++ /dev/null @@ -1,231 +0,0 @@ -package org.embulk.input; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.embulk.config.*; -import org.embulk.spi.BufferAllocator; -import org.embulk.spi.Exec; -import org.embulk.spi.FileInputPlugin; -import org.embulk.spi.TransactionalFileInput; -import org.embulk.spi.util.InputStreamFileInput; -import org.jruby.embed.ScriptingContainer; -import org.slf4j.Logger; - -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -public class HdfsFileInputPlugin implements FileInputPlugin -{ - private static final Logger logger = Exec.getLogger(HdfsFileInputPlugin.class); - - public interface PluginTask extends Task - { - @Config("config_files") - @ConfigDefault("[]") - public List getConfigFiles(); - - @Config("config") - @ConfigDefault("{}") - public Map getConfig(); - - @Config("input_path") - public String getInputPath(); - - @Config("rewind_seconds") - @ConfigDefault("0") - public int getRewindSeconds(); - - public List getTargetFiles(); - public void setTargetFiles(List targetFiles); - - @ConfigInject - public BufferAllocator getBufferAllocator(); - } - - @Override - public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control control) - { - PluginTask task = config.loadConfig(PluginTask.class); - - // prepare - Configuration configuration = getHdfsConfiguration(task); - FileSystem fs = getFs(configuration); - Path inputPath = new Path(strftime(task.getInputPath(), task.getRewindSeconds())); - - // listing - List targetFiles; - try { - targetFiles = globRecursive(fs, inputPath); - } catch (IOException e) { - logger.error(e.getMessage()); - throw new RuntimeException(e); - } - logger.info("Loading target files: {}", targetFiles); - task.setTargetFiles(targetFiles); - - // number of processors is same with number of targets - int taskCount = targetFiles.size(); - - return resume(task.dump(), taskCount, control); - } - - @Override - public ConfigDiff resume(TaskSource taskSource, - int taskCount, - FileInputPlugin.Control control) - { - control.run(taskSource, taskCount); - return Exec.newConfigDiff(); - } - - @Override - public void cleanup(TaskSource taskSource, - int taskCount, - List successTaskReports) - { - } - - @Override - public TransactionalFileInput open(TaskSource taskSource, int taskIndex) - { - PluginTask task = taskSource.loadTask(PluginTask.class); - - // prepare - Configuration configuration = getHdfsConfiguration(task); - FileSystem fs = getFs(configuration); - - return new HdfsFileInput(task, fs, taskIndex); - } - - private Configuration getHdfsConfiguration(final PluginTask task) - { - Configuration configuration = new Configuration(); - - for (Object configFile : task.getConfigFiles()) { - configuration.addResource(configFile.toString()); - } - configuration.reloadConfiguration(); - - for (Map.Entry entry: task.getConfig().entrySet()) { - configuration.set(entry.getKey(), entry.getValue()); - } - - return configuration; - } - - private FileSystem getFs(final Configuration configuration) - { - try { - FileSystem fs = FileSystem.get(configuration); - return fs; - } - catch (IOException e) { - logger.error(e.getMessage()); - throw new RuntimeException(e); - } - } - - private String strftime(final String raw, final int rewind_seconds) - { - ScriptingContainer jruby = new ScriptingContainer(); - Object resolved = jruby.runScriptlet( - String.format("(Time.now - %s).strftime('%s')", String.valueOf(rewind_seconds), raw)); - return resolved.toString(); - } - - private List globRecursive(final FileSystem fs, final Path hdfsPath) throws IOException - { - List container = new ArrayList(); - for (FileStatus entry : fs.globStatus(hdfsPath)) { - if (entry.isDirectory()) { - container.addAll(listRecursive(fs, entry)); - } - else { - container.add(entry.getPath().toString()); - } - } - return container; - } - - private List listRecursive(final FileSystem fs, FileStatus status) throws IOException { - List container = new ArrayList(); - if (status.isDirectory()) { - for (FileStatus entry : fs.listStatus(status.getPath())) { - container.addAll(listRecursive(fs, entry)); - } - } - else { - container.add(status.getPath().toString()); - } - return container; - } - - - -// private List listUniquify(List stringList) -// { -// Set set = new HashSet(); -// set.addAll(stringList); -// List uniqueStringList = new ArrayList(); -// uniqueStringList.addAll(set); -// return uniqueStringList; -// } - - public static class HdfsFileInput extends InputStreamFileInput implements TransactionalFileInput - { - private static class HdfsFileProvider implements InputStreamFileInput.Provider - { - private final FileSystem fs; - private final Path hdfsPath; - private boolean opened = false; - - public HdfsFileProvider(PluginTask task, FileSystem fs, int taskIndex) - { - this.fs = fs; - this.hdfsPath = new Path(task.getTargetFiles().get(taskIndex)); - } - - @Override - public InputStream openNext() throws IOException - { - if (opened) { - return null; - } - - opened = true; - return fs.open(hdfsPath); - } - - @Override - public void close() - { - } - } - - public HdfsFileInput(PluginTask task, FileSystem fs, int taskIndex) - { - super(task.getBufferAllocator(), new HdfsFileProvider(task, fs, taskIndex)); - } - - @Override - public void close() - { - } - - @Override - public void abort() - { - } - - @Override - public TaskReport commit() - { - return Exec.newTaskReport(); - } - } -} diff --git a/src/main/java/org/embulk/input/hdfs/HdfsFileInputPlugin.java b/src/main/java/org/embulk/input/hdfs/HdfsFileInputPlugin.java new file mode 100644 index 0000000..f94c796 --- /dev/null +++ b/src/main/java/org/embulk/input/hdfs/HdfsFileInputPlugin.java @@ -0,0 +1,143 @@ +package org.embulk.input.hdfs; + +import java.util.List; +import java.util.ArrayList; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import org.embulk.config.TaskReport; +import org.embulk.config.Config; +import org.embulk.config.ConfigDefault; +import org.embulk.config.ConfigInject; +import org.embulk.config.ConfigDiff; +import org.embulk.config.ConfigSource; +import org.embulk.config.Task; +import org.embulk.config.TaskSource; +import org.embulk.spi.Exec; +import org.embulk.spi.FileInputPlugin; +import org.embulk.spi.BufferAllocator; +import org.embulk.spi.TransactionalFileInput; +import org.embulk.spi.util.InputStreamTransactionalFileInput; + +public class HdfsFileInputPlugin + implements FileInputPlugin +{ + public interface PluginTask + extends Task + { + // configuration option 1 (required integer) + @Config("option1") + public int getOption1(); + + // configuration option 2 (optional string, null is not allowed) + @Config("optoin2") + @ConfigDefault("\"myvalue\"") + public String getOption2(); + + // configuration option 3 (optional string, null is allowed) + @Config("optoin3") + @ConfigDefault("null") + public Optional getOption3(); + + //@Config("path_prefix") + //public String getPathPrefix(); + + //@Config("last_path") + //@ConfigDefault("null") + //public Optional getLastPath(); + + // usually, you store list of files in task to pass them from transaction() to run(). + //public List getFiles(); + //public void setFiles(List files); + + @ConfigInject + public BufferAllocator getBufferAllocator(); + } + + @Override + public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control control) + { + PluginTask task = config.loadConfig(PluginTask.class); + + // run() method is called for this number of times in parallel. + int taskCount = 1; + + // usually, taskCount is number of input files. + //task.setFiles(listFiles(task)); + //int taskCount = task.getFiles().size(); + + return resume(task.dump(), taskCount, control); + } + + // usually, you have an method to create list of files + //List listFiles(PluginTask task) + //{ + // final ImmutableList.Builder builder = ImmutableList.builder(); + // for (String path : listFilesWithPrefix(task.getPathPrefix())) { + // if (task.getLastPath().isPresent() && path.compareTo(task.getLastPath().get())) { + // continue; + // } + // builder.add(path); + // } + // return builder.build(); + //} + + @Override + public ConfigDiff resume(TaskSource taskSource, + int taskCount, + FileInputPlugin.Control control) + { + control.run(taskSource, taskCount); + + ConfigDiff configDiff = Exec.newConfigDiff(); + + // usually, yo uset last_path + //if (task.getFiles().isEmpty()) { + // if (task.getLastPath().isPresent()) { + // configDiff.set("last_path", task.getLastPath().get()); + // } + //} else { + // List files = new ArrayList(task.getFiles()); + // Collections.sort(files); + // configDiff.set("last_path", files.get(files.size() - 1)); + //} + + return configDiff; + } + + @Override + public void cleanup(TaskSource taskSource, + int taskCount, + List successTaskReports) + { + } + + @Override + public TransactionalFileInput open(TaskSource taskSource, int taskIndex) + { + final PluginTask task = taskSource.loadTask(PluginTask.class); + + // Write your code here :) + throw new UnsupportedOperationException("HdfsFileInputPlugin.open method is not implemented yet"); + + // if you expect InputStream, you can use this code: + + //InputStream input = openInputStream(task, task.getFiles().get(taskIndex)); + // + //return new InputStreamTransactionalFileInput(task.getBufferAllocator(), input) { + // @Override + // public void abort() + // { } + // + // @Override + // public TaskReport commit() + // { + // return Exec.newTaskReport(); + // } + //} + } + + //private static InputStream openInputStream(PluginTask task, String path) + //{ + // return new MyInputStream(file); + //} +} diff --git a/src/test/java/org/embulk/input/TestHdfsFileInputPlugin.java b/src/test/java/org/embulk/input/hdfs/TestHdfsFileInputPlugin.java similarity index 57% rename from src/test/java/org/embulk/input/TestHdfsFileInputPlugin.java rename to src/test/java/org/embulk/input/hdfs/TestHdfsFileInputPlugin.java index 622e938..b8eb7c7 100644 --- a/src/test/java/org/embulk/input/TestHdfsFileInputPlugin.java +++ b/src/test/java/org/embulk/input/hdfs/TestHdfsFileInputPlugin.java @@ -1,4 +1,4 @@ -package org.embulk.input; +package org.embulk.input.hdfs; public class TestHdfsFileInputPlugin { From d38367615312f17e241e9e84e40850aacb0fdf44 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Wed, 19 Aug 2015 21:31:03 +0900 Subject: [PATCH 02/13] add .ruby-version to .gitignore --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index 3edd668..eab43f9 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,5 @@ build/ .idea *.iml +.ruby-version + From 96ce09deb610e73c75f62e8e0bac1e90aae4e1e5 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Thu, 20 Aug 2015 10:01:47 +0900 Subject: [PATCH 03/13] simplify the code --- .../input/hdfs/HdfsFileInputPlugin.java | 191 +++++++++++++----- 1 file changed, 140 insertions(+), 51 deletions(-) diff --git a/src/main/java/org/embulk/input/hdfs/HdfsFileInputPlugin.java b/src/main/java/org/embulk/input/hdfs/HdfsFileInputPlugin.java index f94c796..26c0d46 100644 --- a/src/main/java/org/embulk/input/hdfs/HdfsFileInputPlugin.java +++ b/src/main/java/org/embulk/input/hdfs/HdfsFileInputPlugin.java @@ -1,9 +1,17 @@ package org.embulk.input.hdfs; +import java.io.IOException; +import java.io.InputStream; import java.util.List; import java.util.ArrayList; +import java.util.Map; + import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.embulk.config.TaskReport; import org.embulk.config.Config; import org.embulk.config.ConfigDefault; @@ -16,38 +24,37 @@ import org.embulk.spi.FileInputPlugin; import org.embulk.spi.BufferAllocator; import org.embulk.spi.TransactionalFileInput; +import org.embulk.spi.util.InputStreamFileInput; import org.embulk.spi.util.InputStreamTransactionalFileInput; +import org.jruby.embed.ScriptingContainer; +import org.slf4j.Logger; -public class HdfsFileInputPlugin - implements FileInputPlugin +public class HdfsFileInputPlugin implements FileInputPlugin { - public interface PluginTask - extends Task + private static final Logger logger = Exec.getLogger(HdfsFileInputPlugin.class); + + public interface PluginTask extends Task { - // configuration option 1 (required integer) - @Config("option1") - public int getOption1(); + @Config("config_files") + @ConfigDefault("[]") + public List getConfigFiles(); - // configuration option 2 (optional string, null is not allowed) - @Config("optoin2") - @ConfigDefault("\"myvalue\"") - public String getOption2(); + @Config("config") + @ConfigDefault("{}") + public Map getConfig(); - // configuration option 3 (optional string, null is allowed) - @Config("optoin3") - @ConfigDefault("null") - public Optional getOption3(); + @Config("input_path") + public String getInputPath(); - //@Config("path_prefix") - //public String getPathPrefix(); + @Config("rewind_seconds") + @ConfigDefault("0") + public int getRewindSeconds(); - //@Config("last_path") - //@ConfigDefault("null") - //public Optional getLastPath(); + public List getHdfsFiles(); + public void setHdfsFiles(List hdfsFiles); - // usually, you store list of files in task to pass them from transaction() to run(). - //public List getFiles(); - //public void setFiles(List files); + public FileSystem getHdfs(); + public void setHdfs(FileSystem hdfs); @ConfigInject public BufferAllocator getBufferAllocator(); @@ -58,8 +65,29 @@ public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control contr { PluginTask task = config.loadConfig(PluginTask.class); - // run() method is called for this number of times in parallel. - int taskCount = 1; + // prepare: set FileSystem + try { + Configuration configuration = getHdfsConfiguration(task); + FileSystem hdfs = FileSystem.get(configuration); + task.setHdfs(hdfs); + } + catch (IOException e) { + logger.error(e.getMessage()); + throw new RuntimeException(e); + } + + // listing Files + String pathString = strftime(task.getInputPath(), task.getRewindSeconds()); + try { + task.setHdfsFiles(buildFileList(task, pathString)); + } catch (IOException e) { + logger.error(e.getMessage()); + throw new RuntimeException(e); + } + logger.info("Loading target files: {}", task.getHdfsFiles()); + + // number of processors is same with number of targets + int taskCount = task.getHdfsFiles().size(); // usually, taskCount is number of input files. //task.setFiles(listFiles(task)); @@ -81,10 +109,71 @@ public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control contr // return builder.build(); //} + private Configuration getHdfsConfiguration(final PluginTask task) + { + Configuration configuration = new Configuration(); + + for (Object configFile : task.getConfigFiles()) { + configuration.addResource(configFile.toString()); + } + configuration.reloadConfiguration(); + + for (Map.Entry entry: task.getConfig().entrySet()) { + configuration.set(entry.getKey(), entry.getValue()); + } + + return configuration; + } + + private String strftime(final String raw, final int rewind_seconds) + { + ScriptingContainer jruby = new ScriptingContainer(); + Object resolved = jruby.runScriptlet( + String.format("(Time.now - %s).strftime('%s')", String.valueOf(rewind_seconds), raw)); + return resolved.toString(); + } + + private List buildFileList(final PluginTask task, final String pathString) throws IOException + { + List fileList = new ArrayList<>(); + for (FileStatus entry : task.getHdfs().globStatus(new Path(pathString))) { + if (entry.isDirectory()) { + fileList.addAll(lsr(task.getHdfs(), entry)); + } else { + fileList.add(entry.getPath()); + } + } + return fileList; + } + + private List lsr(final FileSystem fs, FileStatus status) throws IOException + { + List fileList = new ArrayList<>(); + if (status.isDirectory()) { + for (FileStatus entry : fs.listStatus(status.getPath())) { + fileList.addAll(lsr(fs, entry)); + } + } + else { + fileList.add(status.getPath()); + } + return fileList; + } + +// private List distinctFileList(List fileList) +// { +// Set set = new HashSet(); +// set.addAll(fileList); +// List uniqueFileList = new ArrayList(); +// uniqueFileList.addAll(set); +// return uniqueFileList; +// } + + @Override public ConfigDiff resume(TaskSource taskSource, - int taskCount, - FileInputPlugin.Control control) + int taskCount, + FileInputPlugin.Control control) { control.run(taskSource, taskCount); @@ -106,8 +195,8 @@ public ConfigDiff resume(TaskSource taskSource, @Override public void cleanup(TaskSource taskSource, - int taskCount, - List successTaskReports) + int taskCount, + List successTaskReports) { } @@ -116,28 +205,28 @@ public TransactionalFileInput open(TaskSource taskSource, int taskIndex) { final PluginTask task = taskSource.loadTask(PluginTask.class); - // Write your code here :) - throw new UnsupportedOperationException("HdfsFileInputPlugin.open method is not implemented yet"); - - // if you expect InputStream, you can use this code: - - //InputStream input = openInputStream(task, task.getFiles().get(taskIndex)); - // - //return new InputStreamTransactionalFileInput(task.getBufferAllocator(), input) { - // @Override - // public void abort() - // { } - // - // @Override - // public TaskReport commit() - // { - // return Exec.newTaskReport(); - // } - //} + InputStream input; + try { + input = openInputStream(task, task.getHdfsFiles().get(taskIndex)); + } catch (IOException e) { + logger.error(e.getMessage()); + throw new RuntimeException(e); + } + + return new InputStreamTransactionalFileInput(task.getBufferAllocator(), input) { + @Override + public void abort() + { } + + @Override + public TaskReport commit() + { + return Exec.newTaskReport(); + } + }; } - //private static InputStream openInputStream(PluginTask task, String path) - //{ - // return new MyInputStream(file); - //} + private static InputStream openInputStream(PluginTask task, Path path) throws IOException { + return task.getHdfs().open(path); + } } From 16c4017c3b3da0ac506ea672ff6938cf60f16b62 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Thu, 20 Aug 2015 13:51:05 +0900 Subject: [PATCH 04/13] task should not have FileSystem. --- .../input/hdfs/HdfsFileInputPlugin.java | 51 +++++++------------ 1 file changed, 18 insertions(+), 33 deletions(-) diff --git a/src/main/java/org/embulk/input/hdfs/HdfsFileInputPlugin.java b/src/main/java/org/embulk/input/hdfs/HdfsFileInputPlugin.java index 26c0d46..61428b5 100644 --- a/src/main/java/org/embulk/input/hdfs/HdfsFileInputPlugin.java +++ b/src/main/java/org/embulk/input/hdfs/HdfsFileInputPlugin.java @@ -50,11 +50,8 @@ public interface PluginTask extends Task @ConfigDefault("0") public int getRewindSeconds(); - public List getHdfsFiles(); - public void setHdfsFiles(List hdfsFiles); - - public FileSystem getHdfs(); - public void setHdfs(FileSystem hdfs); + public List getFiles(); + public void setFiles(List hdfsFiles); @ConfigInject public BufferAllocator getBufferAllocator(); @@ -65,29 +62,18 @@ public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control contr { PluginTask task = config.loadConfig(PluginTask.class); - // prepare: set FileSystem - try { - Configuration configuration = getHdfsConfiguration(task); - FileSystem hdfs = FileSystem.get(configuration); - task.setHdfs(hdfs); - } - catch (IOException e) { - logger.error(e.getMessage()); - throw new RuntimeException(e); - } - // listing Files String pathString = strftime(task.getInputPath(), task.getRewindSeconds()); try { - task.setHdfsFiles(buildFileList(task, pathString)); + task.setFiles(buildFileList(getFs(task), pathString)); } catch (IOException e) { logger.error(e.getMessage()); throw new RuntimeException(e); } - logger.info("Loading target files: {}", task.getHdfsFiles()); + logger.info("Loading target files: {}", task.getFiles()); // number of processors is same with number of targets - int taskCount = task.getHdfsFiles().size(); + int taskCount = task.getFiles().size(); // usually, taskCount is number of input files. //task.setFiles(listFiles(task)); @@ -109,8 +95,7 @@ public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control contr // return builder.build(); //} - private Configuration getHdfsConfiguration(final PluginTask task) - { + private static FileSystem getFs(final PluginTask task) throws IOException { Configuration configuration = new Configuration(); for (Object configFile : task.getConfigFiles()) { @@ -122,7 +107,7 @@ private Configuration getHdfsConfiguration(final PluginTask task) configuration.set(entry.getKey(), entry.getValue()); } - return configuration; + return FileSystem.get(configuration); } private String strftime(final String raw, final int rewind_seconds) @@ -133,29 +118,29 @@ private String strftime(final String raw, final int rewind_seconds) return resolved.toString(); } - private List buildFileList(final PluginTask task, final String pathString) throws IOException + private List buildFileList(final FileSystem fs, final String pathString) throws IOException { - List fileList = new ArrayList<>(); - for (FileStatus entry : task.getHdfs().globStatus(new Path(pathString))) { + List fileList = new ArrayList<>(); + for (FileStatus entry : fs.globStatus(new Path(pathString))) { if (entry.isDirectory()) { - fileList.addAll(lsr(task.getHdfs(), entry)); + fileList.addAll(lsr(fs, entry)); } else { - fileList.add(entry.getPath()); + fileList.add(entry.getPath().toString()); } } return fileList; } - private List lsr(final FileSystem fs, FileStatus status) throws IOException + private List lsr(final FileSystem fs, FileStatus status) throws IOException { - List fileList = new ArrayList<>(); + List fileList = new ArrayList<>(); if (status.isDirectory()) { for (FileStatus entry : fs.listStatus(status.getPath())) { fileList.addAll(lsr(fs, entry)); } } else { - fileList.add(status.getPath()); + fileList.add(status.getPath().toString()); } return fileList; } @@ -207,7 +192,7 @@ public TransactionalFileInput open(TaskSource taskSource, int taskIndex) InputStream input; try { - input = openInputStream(task, task.getHdfsFiles().get(taskIndex)); + input = openInputStream(task, task.getFiles().get(taskIndex)); } catch (IOException e) { logger.error(e.getMessage()); throw new RuntimeException(e); @@ -226,7 +211,7 @@ public TaskReport commit() }; } - private static InputStream openInputStream(PluginTask task, Path path) throws IOException { - return task.getHdfs().open(path); + private static InputStream openInputStream(PluginTask task, String path) throws IOException { + return getFs(task).open(new Path(path)); } } From 4dd40529a2d72f46d621aff532c22634ac5723f1 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Thu, 20 Aug 2015 20:45:22 +0900 Subject: [PATCH 05/13] cosmetic change. --- .../input/hdfs/HdfsFileInputPlugin.java | 142 ++++++++---------- 1 file changed, 65 insertions(+), 77 deletions(-) diff --git a/src/main/java/org/embulk/input/hdfs/HdfsFileInputPlugin.java b/src/main/java/org/embulk/input/hdfs/HdfsFileInputPlugin.java index 61428b5..b32b19f 100644 --- a/src/main/java/org/embulk/input/hdfs/HdfsFileInputPlugin.java +++ b/src/main/java/org/embulk/input/hdfs/HdfsFileInputPlugin.java @@ -65,8 +65,10 @@ public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control contr // listing Files String pathString = strftime(task.getInputPath(), task.getRewindSeconds()); try { + // TODO: Create PartitionedFileList for using MultiThread task.setFiles(buildFileList(getFs(task), pathString)); - } catch (IOException e) { + } + catch (IOException e) { logger.error(e.getMessage()); throw new RuntimeException(e); } @@ -75,25 +77,71 @@ public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control contr // number of processors is same with number of targets int taskCount = task.getFiles().size(); - // usually, taskCount is number of input files. - //task.setFiles(listFiles(task)); - //int taskCount = task.getFiles().size(); - return resume(task.dump(), taskCount, control); } - // usually, you have an method to create list of files - //List listFiles(PluginTask task) - //{ - // final ImmutableList.Builder builder = ImmutableList.builder(); - // for (String path : listFilesWithPrefix(task.getPathPrefix())) { - // if (task.getLastPath().isPresent() && path.compareTo(task.getLastPath().get())) { - // continue; - // } - // builder.add(path); - // } - // return builder.build(); - //} + @Override + public ConfigDiff resume(TaskSource taskSource, + int taskCount, + FileInputPlugin.Control control) + { + control.run(taskSource, taskCount); + + ConfigDiff configDiff = Exec.newConfigDiff(); + + // usually, yo uset last_path + //if (task.getFiles().isEmpty()) { + // if (task.getLastPath().isPresent()) { + // configDiff.set("last_path", task.getLastPath().get()); + // } + //} else { + // List files = new ArrayList(task.getFiles()); + // Collections.sort(files); + // configDiff.set("last_path", files.get(files.size() - 1)); + //} + + return configDiff; + } + + @Override + public void cleanup(TaskSource taskSource, + int taskCount, + List successTaskReports) + { + } + + @Override + public TransactionalFileInput open(TaskSource taskSource, int taskIndex) + { + final PluginTask task = taskSource.loadTask(PluginTask.class); + + InputStream input; + try { + input = openInputStream(task, task.getFiles().get(taskIndex)); + } + catch (IOException e) { + logger.error(e.getMessage()); + throw new RuntimeException(e); + } + + return new InputStreamTransactionalFileInput(task.getBufferAllocator(), input) { + @Override + public void abort() + { } + + @Override + public TaskReport commit() + { + return Exec.newTaskReport(); + } + }; + } + + // TODO: use PartitionedFileInputStream + private static InputStream openInputStream(PluginTask task, String path) throws IOException + { + return getFs(task).open(new Path(path)); + } private static FileSystem getFs(final PluginTask task) throws IOException { Configuration configuration = new Configuration(); @@ -154,64 +202,4 @@ private List lsr(final FileSystem fs, FileStatus status) throws IOExcept // return uniqueFileList; // } - - @Override - public ConfigDiff resume(TaskSource taskSource, - int taskCount, - FileInputPlugin.Control control) - { - control.run(taskSource, taskCount); - - ConfigDiff configDiff = Exec.newConfigDiff(); - - // usually, yo uset last_path - //if (task.getFiles().isEmpty()) { - // if (task.getLastPath().isPresent()) { - // configDiff.set("last_path", task.getLastPath().get()); - // } - //} else { - // List files = new ArrayList(task.getFiles()); - // Collections.sort(files); - // configDiff.set("last_path", files.get(files.size() - 1)); - //} - - return configDiff; - } - - @Override - public void cleanup(TaskSource taskSource, - int taskCount, - List successTaskReports) - { - } - - @Override - public TransactionalFileInput open(TaskSource taskSource, int taskIndex) - { - final PluginTask task = taskSource.loadTask(PluginTask.class); - - InputStream input; - try { - input = openInputStream(task, task.getFiles().get(taskIndex)); - } catch (IOException e) { - logger.error(e.getMessage()); - throw new RuntimeException(e); - } - - return new InputStreamTransactionalFileInput(task.getBufferAllocator(), input) { - @Override - public void abort() - { } - - @Override - public TaskReport commit() - { - return Exec.newTaskReport(); - } - }; - } - - private static InputStream openInputStream(PluginTask task, String path) throws IOException { - return getFs(task).open(new Path(path)); - } } From 8589f6bbe24c184e6c7a41b25bf5ed6061e2c2e7 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Mon, 31 Aug 2015 04:03:21 +0900 Subject: [PATCH 06/13] guava 14.0 -> 15.0 --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 279d6ef..53cc193 100644 --- a/build.gradle +++ b/build.gradle @@ -22,7 +22,7 @@ dependencies { provided "org.embulk:embulk-core:0.7.0" // compile "YOUR_JAR_DEPENDENCY_GROUP:YOUR_JAR_DEPENDENCY_MODULE:YOUR_JAR_DEPENDENCY_VERSION" compile 'org.apache.hadoop:hadoop-client:2.6.0' - compile 'com.google.guava:guava:14.0' + compile 'com.google.guava:guava:15.0' testCompile "junit:junit:4.+" } From e322b223f352b39df57701253ca79322f955eb04 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Mon, 31 Aug 2015 04:03:39 +0900 Subject: [PATCH 07/13] support partition files. ref. https://github.com/hito4t/embulk-input-filesplit --- .../input/hdfs/HdfsFileInputPlugin.java | 40 +++-- .../input/hdfs/HdfsFilePartitionManager.java | 63 +++++++ .../input/hdfs/HdfsFilePartitioner.java | 39 +++++ .../embulk/input/hdfs/HdfsPartialFile.java | 37 +++++ .../hdfs/HdfsPartialFileInputStream.java | 154 ++++++++++++++++++ 5 files changed, 320 insertions(+), 13 deletions(-) create mode 100644 src/main/java/org/embulk/input/hdfs/HdfsFilePartitionManager.java create mode 100644 src/main/java/org/embulk/input/hdfs/HdfsFilePartitioner.java create mode 100644 src/main/java/org/embulk/input/hdfs/HdfsPartialFile.java create mode 100644 src/main/java/org/embulk/input/hdfs/HdfsPartialFileInputStream.java diff --git a/src/main/java/org/embulk/input/hdfs/HdfsFileInputPlugin.java b/src/main/java/org/embulk/input/hdfs/HdfsFileInputPlugin.java index b32b19f..3e443a1 100644 --- a/src/main/java/org/embulk/input/hdfs/HdfsFileInputPlugin.java +++ b/src/main/java/org/embulk/input/hdfs/HdfsFileInputPlugin.java @@ -8,6 +8,7 @@ import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -20,15 +21,14 @@ import org.embulk.config.ConfigSource; import org.embulk.config.Task; import org.embulk.config.TaskSource; -import org.embulk.spi.Exec; -import org.embulk.spi.FileInputPlugin; -import org.embulk.spi.BufferAllocator; -import org.embulk.spi.TransactionalFileInput; +import org.embulk.spi.*; import org.embulk.spi.util.InputStreamFileInput; import org.embulk.spi.util.InputStreamTransactionalFileInput; import org.jruby.embed.ScriptingContainer; import org.slf4j.Logger; +import javax.annotation.Nullable; + public class HdfsFileInputPlugin implements FileInputPlugin { private static final Logger logger = Exec.getLogger(HdfsFileInputPlugin.class); @@ -50,8 +50,12 @@ public interface PluginTask extends Task @ConfigDefault("0") public int getRewindSeconds(); - public List getFiles(); - public void setFiles(List hdfsFiles); + @Config("partition_size") + @ConfigDefault("-1") + public int getPartitionSize(); + + public List getFiles(); + public void setFiles(List hdfsFiles); @ConfigInject public BufferAllocator getBufferAllocator(); @@ -65,17 +69,26 @@ public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control contr // listing Files String pathString = strftime(task.getInputPath(), task.getRewindSeconds()); try { - // TODO: Create PartitionedFileList for using MultiThread - task.setFiles(buildFileList(getFs(task), pathString)); + List originalFileList = buildFileList(getFs(task), pathString); + HdfsFilePartitionManager partitionManager = new HdfsFilePartitionManager( + getFs(task), originalFileList, task.getPartitionSize()); + task.setFiles(partitionManager.getHdfsPartialFiles()); + logger.info("Loading target files: {}", originalFileList); } catch (IOException e) { logger.error(e.getMessage()); throw new RuntimeException(e); } - logger.info("Loading target files: {}", task.getFiles()); + + // log the detail of partial files. + for (HdfsPartialFile partialFile : task.getFiles()) { + logger.info("target file: {}, start: {}, end: {}", + partialFile.getPath().toString(), partialFile.getStart(), partialFile.getEnd()); + } // number of processors is same with number of targets int taskCount = task.getFiles().size(); + logger.info("task size: {}", taskCount); return resume(task.dump(), taskCount, control); } @@ -89,7 +102,7 @@ public ConfigDiff resume(TaskSource taskSource, ConfigDiff configDiff = Exec.newConfigDiff(); - // usually, yo uset last_path + // usually, yo use last_path //if (task.getFiles().isEmpty()) { // if (task.getLastPath().isPresent()) { // configDiff.set("last_path", task.getLastPath().get()); @@ -137,10 +150,11 @@ public TaskReport commit() }; } - // TODO: use PartitionedFileInputStream - private static InputStream openInputStream(PluginTask task, String path) throws IOException + private static HdfsPartialFileInputStream openInputStream(PluginTask task, HdfsPartialFile partialFile) throws IOException { - return getFs(task).open(new Path(path)); + FileSystem fs = getFs(task); + InputStream original = fs.open(partialFile.getPath()); + return new HdfsPartialFileInputStream(original, partialFile.getStart(), partialFile.getEnd()); } private static FileSystem getFs(final PluginTask task) throws IOException { diff --git a/src/main/java/org/embulk/input/hdfs/HdfsFilePartitionManager.java b/src/main/java/org/embulk/input/hdfs/HdfsFilePartitionManager.java new file mode 100644 index 0000000..fa962c8 --- /dev/null +++ b/src/main/java/org/embulk/input/hdfs/HdfsFilePartitionManager.java @@ -0,0 +1,63 @@ +package org.embulk.input.hdfs; + +import com.google.common.base.Function; +import com.google.common.collect.Lists; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Created by takahiro.nakayama on 8/20/15. + */ +class HdfsFilePartitionManager +{ + private FileSystem fs; + private List fileList; + private int partitionSize; + private final int partitionParameter = 2; + + public HdfsFilePartitionManager(FileSystem fs, List fileList, int partitionSize) + { + this.fs = fs; + this.fileList = Lists.transform(fileList, new Function() { + @Nullable + @Override + public Path apply(String filePathString) { + return new Path(filePathString); + } + }); + this.partitionSize = partitionSize; + } + + public List getHdfsPartialFiles() throws IOException + { + if (partitionSize <= 0) { + long fileLengthSum = 0; + for (Path path : fileList) { + fileLengthSum += fs.getFileStatus(path).getLen(); + } + partitionSize = (int) ( + fileLengthSum / (Runtime.getRuntime().availableProcessors() * partitionParameter)); + } + + List hdfsPartialFiles = new ArrayList<>(); + for (Path path : fileList) { + int fileLength = (int) fs.getFileStatus(path).getLen(); + int partitionNum = fileLength / partitionSize; + int remainder = fileLength % partitionNum; + + if (remainder > 0) { + partitionNum++; + } + + HdfsFilePartitioner partitioner = new HdfsFilePartitioner(fs, path, partitionNum); + hdfsPartialFiles.addAll(partitioner.getHdfsPartialFiles()); + } + + return hdfsPartialFiles; + } +} diff --git a/src/main/java/org/embulk/input/hdfs/HdfsFilePartitioner.java b/src/main/java/org/embulk/input/hdfs/HdfsFilePartitioner.java new file mode 100644 index 0000000..f79f1c7 --- /dev/null +++ b/src/main/java/org/embulk/input/hdfs/HdfsFilePartitioner.java @@ -0,0 +1,39 @@ +package org.embulk.input.hdfs; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Created by takahiro.nakayama on 8/20/15. + */ +class HdfsFilePartitioner +{ + private FileSystem fs; + private Path path; + private int partitionNum; + + public HdfsFilePartitioner(FileSystem fs, Path path, int partitionNum) + { + this.fs = fs; + this.path = path; + this.partitionNum = partitionNum; + } + + public List getHdfsPartialFiles() throws IOException + { + List hdfsPartialFiles = new ArrayList<>(); + long size = fs.getFileStatus(path).getLen(); + for (int i = 0; i < partitionNum; i++) { + long start = size * i / partitionNum; + long end = size * (i + 1) / partitionNum; + if (start < end) { + hdfsPartialFiles.add(new HdfsPartialFile(path, start, end)); + } + } + return hdfsPartialFiles; + } +} diff --git a/src/main/java/org/embulk/input/hdfs/HdfsPartialFile.java b/src/main/java/org/embulk/input/hdfs/HdfsPartialFile.java new file mode 100644 index 0000000..4846af7 --- /dev/null +++ b/src/main/java/org/embulk/input/hdfs/HdfsPartialFile.java @@ -0,0 +1,37 @@ +package org.embulk.input.hdfs; + +import org.apache.hadoop.fs.Path; + +/** + * Created by takahiro.nakayama on 8/20/15. + */ +// ref. https://github.com/hito4t/embulk-input-filesplit/blob/master/src/main/java/org/embulk/input/filesplit/PartialFile.java +class HdfsPartialFile +{ + private Path path; + private long start; + private long end; + + public HdfsPartialFile(Path path, long start, long end) + { + this.path = path; + this.start = start; + this.end = end; + } + + public Path getPath() + { + return path; + } + + public long getStart() + { + return start; + } + + public long getEnd() + { + return end; + } + +} diff --git a/src/main/java/org/embulk/input/hdfs/HdfsPartialFileInputStream.java b/src/main/java/org/embulk/input/hdfs/HdfsPartialFileInputStream.java new file mode 100644 index 0000000..2de2424 --- /dev/null +++ b/src/main/java/org/embulk/input/hdfs/HdfsPartialFileInputStream.java @@ -0,0 +1,154 @@ +package org.embulk.input.hdfs; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.PushbackInputStream; + +// ref. https://github.com/hito4t/embulk-input-filesplit/blob/master/src/main/java/org/embulk/input/filesplit/PartialFileInputStream.java +class HdfsPartialFileInputStream extends InputStream +{ + private final PushbackInputStream original; + private long start; + private long end; + private long current; + private boolean eof; + + public HdfsPartialFileInputStream(InputStream original, long start, long end) + { + this.original = new PushbackInputStream(new BufferedInputStream(original)); + this.start = start; + this.end = end; + current = -1; + } + + @Override + public int read(byte[] b) throws IOException + { + return read(b, 0, b.length); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException + { + initializeIfNeeded(); + + if (eof) { + return -1; + } + + int read = original.read(b, off, len); + if (read < 0) { + eof = true; + return -1; + } + + current += read; + if (current >= end) { + for (int i = Math.max((int)(end - 1 - current + read), 0); i < read; i++) { + if (b[off + i] == '\n') { + eof = true; + return i + 1; + } + + if (b[off + i] == '\r') { + int next = (i < read ? b[off + i + 1] : prefetch()); + if (next != '\n') { + eof = true; + return i + 1; + } + } + } + } + + return read; + } + + @Override + public int read() throws IOException + { + initializeIfNeeded(); + + if (eof) { + return -1; + } + + int read = original.read(); + current++; + + if (read < 0) { + eof = true; + return -1; + } + + if (current >= end) { + if (read == '\n' || read == '\r' && prefetch() != '\n') { + eof = true; + } + } + + return read; + } + + @Override + public long skip(long n) throws IOException + { + throw new IOException("Skip not supported."); + /* + long skip = original.skip(n); + current += skip; + return skip; + */ + } + + @Override + public int available() throws IOException + { + return 0; + } + + @Override + public void close() throws IOException + { + original.close(); + } + + private void initializeIfNeeded() throws IOException + { + if (current >= start) { + return; + + } + if (start == 0) { + current = 0; + } else { + current = original.skip(--start); + if (current != start) { + throw new IOException("Cannot skip."); + } + + int c; + while ((c = original.read()) >= 0) { + start++; + current++; + + if (c == '\n' || c == '\r' && prefetch() != '\n') { + break; + } + } + } + + if (start >= end) { + eof = true; + } + } + + private int prefetch() throws IOException + { + int c = original.read(); + if (c >= 0) { + original.unread(c); + } + return c; + } +} \ No newline at end of file From 7d1ef8aba61622db60fe8bec4f0ce3efef68d010 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Mon, 31 Aug 2015 04:25:07 +0900 Subject: [PATCH 08/13] change private class to public because embulk calls. --- .../java/org/embulk/input/hdfs/HdfsFilePartitionManager.java | 2 +- src/main/java/org/embulk/input/hdfs/HdfsFilePartitioner.java | 2 +- src/main/java/org/embulk/input/hdfs/HdfsPartialFile.java | 2 +- .../java/org/embulk/input/hdfs/HdfsPartialFileInputStream.java | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/embulk/input/hdfs/HdfsFilePartitionManager.java b/src/main/java/org/embulk/input/hdfs/HdfsFilePartitionManager.java index fa962c8..7dfc312 100644 --- a/src/main/java/org/embulk/input/hdfs/HdfsFilePartitionManager.java +++ b/src/main/java/org/embulk/input/hdfs/HdfsFilePartitionManager.java @@ -13,7 +13,7 @@ /** * Created by takahiro.nakayama on 8/20/15. */ -class HdfsFilePartitionManager +public class HdfsFilePartitionManager { private FileSystem fs; private List fileList; diff --git a/src/main/java/org/embulk/input/hdfs/HdfsFilePartitioner.java b/src/main/java/org/embulk/input/hdfs/HdfsFilePartitioner.java index f79f1c7..bf136e0 100644 --- a/src/main/java/org/embulk/input/hdfs/HdfsFilePartitioner.java +++ b/src/main/java/org/embulk/input/hdfs/HdfsFilePartitioner.java @@ -10,7 +10,7 @@ /** * Created by takahiro.nakayama on 8/20/15. */ -class HdfsFilePartitioner +public class HdfsFilePartitioner { private FileSystem fs; private Path path; diff --git a/src/main/java/org/embulk/input/hdfs/HdfsPartialFile.java b/src/main/java/org/embulk/input/hdfs/HdfsPartialFile.java index 4846af7..250701f 100644 --- a/src/main/java/org/embulk/input/hdfs/HdfsPartialFile.java +++ b/src/main/java/org/embulk/input/hdfs/HdfsPartialFile.java @@ -6,7 +6,7 @@ * Created by takahiro.nakayama on 8/20/15. */ // ref. https://github.com/hito4t/embulk-input-filesplit/blob/master/src/main/java/org/embulk/input/filesplit/PartialFile.java -class HdfsPartialFile +public class HdfsPartialFile { private Path path; private long start; diff --git a/src/main/java/org/embulk/input/hdfs/HdfsPartialFileInputStream.java b/src/main/java/org/embulk/input/hdfs/HdfsPartialFileInputStream.java index 2de2424..79e85f9 100644 --- a/src/main/java/org/embulk/input/hdfs/HdfsPartialFileInputStream.java +++ b/src/main/java/org/embulk/input/hdfs/HdfsPartialFileInputStream.java @@ -6,7 +6,7 @@ import java.io.PushbackInputStream; // ref. https://github.com/hito4t/embulk-input-filesplit/blob/master/src/main/java/org/embulk/input/filesplit/PartialFileInputStream.java -class HdfsPartialFileInputStream extends InputStream +public class HdfsPartialFileInputStream extends InputStream { private final PushbackInputStream original; private long start; From 4f28208e6ca1670447f8fb0bda09d8bfdbcb2146 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Mon, 31 Aug 2015 04:39:10 +0900 Subject: [PATCH 09/13] I want resolve the error: `com.fasterxml.jackson.databind.JsonMappingException: No suitable constructor found for type [simple type, class org.apache.hadoop.fs.Path]: can not instantiate from JSON object (missing default constructor or creator, or perhaps need to add/enable type information?)` --- .../java/org/embulk/input/hdfs/HdfsFileInputPlugin.java | 4 ++-- .../java/org/embulk/input/hdfs/HdfsFilePartitioner.java | 2 +- src/main/java/org/embulk/input/hdfs/HdfsPartialFile.java | 9 ++++++--- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/embulk/input/hdfs/HdfsFileInputPlugin.java b/src/main/java/org/embulk/input/hdfs/HdfsFileInputPlugin.java index 3e443a1..4483f69 100644 --- a/src/main/java/org/embulk/input/hdfs/HdfsFileInputPlugin.java +++ b/src/main/java/org/embulk/input/hdfs/HdfsFileInputPlugin.java @@ -83,7 +83,7 @@ public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control contr // log the detail of partial files. for (HdfsPartialFile partialFile : task.getFiles()) { logger.info("target file: {}, start: {}, end: {}", - partialFile.getPath().toString(), partialFile.getStart(), partialFile.getEnd()); + partialFile.getPath(), partialFile.getStart(), partialFile.getEnd()); } // number of processors is same with number of targets @@ -153,7 +153,7 @@ public TaskReport commit() private static HdfsPartialFileInputStream openInputStream(PluginTask task, HdfsPartialFile partialFile) throws IOException { FileSystem fs = getFs(task); - InputStream original = fs.open(partialFile.getPath()); + InputStream original = fs.open(new Path(partialFile.getPath())); return new HdfsPartialFileInputStream(original, partialFile.getStart(), partialFile.getEnd()); } diff --git a/src/main/java/org/embulk/input/hdfs/HdfsFilePartitioner.java b/src/main/java/org/embulk/input/hdfs/HdfsFilePartitioner.java index bf136e0..efb9af3 100644 --- a/src/main/java/org/embulk/input/hdfs/HdfsFilePartitioner.java +++ b/src/main/java/org/embulk/input/hdfs/HdfsFilePartitioner.java @@ -31,7 +31,7 @@ public List getHdfsPartialFiles() throws IOException long start = size * i / partitionNum; long end = size * (i + 1) / partitionNum; if (start < end) { - hdfsPartialFiles.add(new HdfsPartialFile(path, start, end)); + hdfsPartialFiles.add(new HdfsPartialFile(path.toString(), start, end)); } } return hdfsPartialFiles; diff --git a/src/main/java/org/embulk/input/hdfs/HdfsPartialFile.java b/src/main/java/org/embulk/input/hdfs/HdfsPartialFile.java index 250701f..f71ef8c 100644 --- a/src/main/java/org/embulk/input/hdfs/HdfsPartialFile.java +++ b/src/main/java/org/embulk/input/hdfs/HdfsPartialFile.java @@ -8,18 +8,21 @@ // ref. https://github.com/hito4t/embulk-input-filesplit/blob/master/src/main/java/org/embulk/input/filesplit/PartialFile.java public class HdfsPartialFile { - private Path path; + private String path; private long start; private long end; - public HdfsPartialFile(Path path, long start, long end) + public HdfsPartialFile(String path, long start, long end) { this.path = path; this.start = start; this.end = end; } - public Path getPath() + // see: http://stackoverflow.com/questions/7625783/jsonmappingexception-no-suitable-constructor-found-for-type-simple-type-class + public HdfsPartialFile() { } + + public String getPath() { return path; } From 1696657e258e2a30180f6e2d0e94f23524a398a7 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Tue, 8 Sep 2015 09:40:14 +0900 Subject: [PATCH 10/13] merge partition the manager logic to HdfsFileInputPlugin.java this is because the partition logic can use PluginTask in order to change the logic flexibly later. and other change: - not partition compressed files(support only '.gz', '.lzo', '.bz2') - support no partition option. --- .../input/hdfs/HdfsFileInputPlugin.java | 66 +++++++++++++++---- .../input/hdfs/HdfsFilePartitionManager.java | 63 ------------------ 2 files changed, 52 insertions(+), 77 deletions(-) delete mode 100644 src/main/java/org/embulk/input/hdfs/HdfsFilePartitionManager.java diff --git a/src/main/java/org/embulk/input/hdfs/HdfsFileInputPlugin.java b/src/main/java/org/embulk/input/hdfs/HdfsFileInputPlugin.java index 4483f69..9a37bd3 100644 --- a/src/main/java/org/embulk/input/hdfs/HdfsFileInputPlugin.java +++ b/src/main/java/org/embulk/input/hdfs/HdfsFileInputPlugin.java @@ -6,6 +6,7 @@ import java.util.ArrayList; import java.util.Map; +import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -50,9 +51,9 @@ public interface PluginTask extends Task @ConfigDefault("0") public int getRewindSeconds(); - @Config("partition_size") - @ConfigDefault("-1") - public int getPartitionSize(); + @Config("partition") + @ConfigDefault("true") + public boolean getPartition(); public List getFiles(); public void setFiles(List hdfsFiles); @@ -70,9 +71,7 @@ public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control contr String pathString = strftime(task.getInputPath(), task.getRewindSeconds()); try { List originalFileList = buildFileList(getFs(task), pathString); - HdfsFilePartitionManager partitionManager = new HdfsFilePartitionManager( - getFs(task), originalFileList, task.getPartitionSize()); - task.setFiles(partitionManager.getHdfsPartialFiles()); + task.setFiles(allocateHdfsFilesToTasks(task, getFs(task), originalFileList)); logger.info("Loading target files: {}", originalFileList); } catch (IOException e) { @@ -207,13 +206,52 @@ private List lsr(final FileSystem fs, FileStatus status) throws IOExcept return fileList; } -// private List distinctFileList(List fileList) -// { -// Set set = new HashSet(); -// set.addAll(fileList); -// List uniqueFileList = new ArrayList(); -// uniqueFileList.addAll(set); -// return uniqueFileList; -// } + private List allocateHdfsFilesToTasks(final PluginTask task, final FileSystem fs, final List fileList) + throws IOException + { + List pathList = Lists.transform(fileList, new Function() + { + @Nullable + @Override + public Path apply(@Nullable String input) + { + return new Path(input); + } + }); + + int totalFileLength = 0; + for (Path path : pathList) { + totalFileLength += fs.getFileStatus(path).getLen(); + } + + // TODO: optimum allocation of resources + int partitionCountParameter = 3; + int partitionSizeByOneTask = totalFileLength / (Runtime.getRuntime().availableProcessors() * partitionCountParameter); + + List hdfsPartialFiles = new ArrayList<>(); + for (Path path : pathList) { + int partitionCount; + if (path.toString().endsWith(".gz") || path.toString().endsWith(".bz2") || path.toString().endsWith(".lzo")) { + partitionCount = 1; + } + else if (!task.getPartition()) { + partitionCount = 1; + } + else { + int fileLength = (int) fs.getFileStatus(path).getLen(); + partitionCount = fileLength / partitionSizeByOneTask; + int remainder = fileLength % partitionSizeByOneTask; + + if (remainder > 0) { + partitionCount++; + } + } + + HdfsFilePartitioner partitioner = new HdfsFilePartitioner(fs, path, partitionCount); + hdfsPartialFiles.addAll(partitioner.getHdfsPartialFiles()); + } + + return hdfsPartialFiles; + } } diff --git a/src/main/java/org/embulk/input/hdfs/HdfsFilePartitionManager.java b/src/main/java/org/embulk/input/hdfs/HdfsFilePartitionManager.java deleted file mode 100644 index 7dfc312..0000000 --- a/src/main/java/org/embulk/input/hdfs/HdfsFilePartitionManager.java +++ /dev/null @@ -1,63 +0,0 @@ -package org.embulk.input.hdfs; - -import com.google.common.base.Function; -import com.google.common.collect.Lists; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -import javax.annotation.Nullable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -/** - * Created by takahiro.nakayama on 8/20/15. - */ -public class HdfsFilePartitionManager -{ - private FileSystem fs; - private List fileList; - private int partitionSize; - private final int partitionParameter = 2; - - public HdfsFilePartitionManager(FileSystem fs, List fileList, int partitionSize) - { - this.fs = fs; - this.fileList = Lists.transform(fileList, new Function() { - @Nullable - @Override - public Path apply(String filePathString) { - return new Path(filePathString); - } - }); - this.partitionSize = partitionSize; - } - - public List getHdfsPartialFiles() throws IOException - { - if (partitionSize <= 0) { - long fileLengthSum = 0; - for (Path path : fileList) { - fileLengthSum += fs.getFileStatus(path).getLen(); - } - partitionSize = (int) ( - fileLengthSum / (Runtime.getRuntime().availableProcessors() * partitionParameter)); - } - - List hdfsPartialFiles = new ArrayList<>(); - for (Path path : fileList) { - int fileLength = (int) fs.getFileStatus(path).getLen(); - int partitionNum = fileLength / partitionSize; - int remainder = fileLength % partitionNum; - - if (remainder > 0) { - partitionNum++; - } - - HdfsFilePartitioner partitioner = new HdfsFilePartitioner(fs, path, partitionNum); - hdfsPartialFiles.addAll(partitioner.getHdfsPartialFiles()); - } - - return hdfsPartialFiles; - } -} From 6ef1b2368a7f31d9ed13df6c56b40f7e75c07d67 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Tue, 8 Sep 2015 09:47:13 +0900 Subject: [PATCH 11/13] cosmetic change. --- .../org/embulk/input/hdfs/HdfsFileInputPlugin.java | 13 +++++++++---- .../org/embulk/input/hdfs/HdfsFilePartitioner.java | 12 ++++++------ 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/src/main/java/org/embulk/input/hdfs/HdfsFileInputPlugin.java b/src/main/java/org/embulk/input/hdfs/HdfsFileInputPlugin.java index 9a37bd3..0ee8f5c 100644 --- a/src/main/java/org/embulk/input/hdfs/HdfsFileInputPlugin.java +++ b/src/main/java/org/embulk/input/hdfs/HdfsFileInputPlugin.java @@ -149,14 +149,17 @@ public TaskReport commit() }; } - private static HdfsPartialFileInputStream openInputStream(PluginTask task, HdfsPartialFile partialFile) throws IOException + private static HdfsPartialFileInputStream openInputStream(PluginTask task, HdfsPartialFile partialFile) + throws IOException { FileSystem fs = getFs(task); InputStream original = fs.open(new Path(partialFile.getPath())); return new HdfsPartialFileInputStream(original, partialFile.getStart(), partialFile.getEnd()); } - private static FileSystem getFs(final PluginTask task) throws IOException { + private static FileSystem getFs(final PluginTask task) + throws IOException + { Configuration configuration = new Configuration(); for (Object configFile : task.getConfigFiles()) { @@ -179,7 +182,8 @@ private String strftime(final String raw, final int rewind_seconds) return resolved.toString(); } - private List buildFileList(final FileSystem fs, final String pathString) throws IOException + private List buildFileList(final FileSystem fs, final String pathString) + throws IOException { List fileList = new ArrayList<>(); for (FileStatus entry : fs.globStatus(new Path(pathString))) { @@ -192,7 +196,8 @@ private List buildFileList(final FileSystem fs, final String pathString) return fileList; } - private List lsr(final FileSystem fs, FileStatus status) throws IOException + private List lsr(final FileSystem fs, FileStatus status) + throws IOException { List fileList = new ArrayList<>(); if (status.isDirectory()) { diff --git a/src/main/java/org/embulk/input/hdfs/HdfsFilePartitioner.java b/src/main/java/org/embulk/input/hdfs/HdfsFilePartitioner.java index efb9af3..ce6c36d 100644 --- a/src/main/java/org/embulk/input/hdfs/HdfsFilePartitioner.java +++ b/src/main/java/org/embulk/input/hdfs/HdfsFilePartitioner.java @@ -14,22 +14,22 @@ public class HdfsFilePartitioner { private FileSystem fs; private Path path; - private int partitionNum; + private int partitionCount; - public HdfsFilePartitioner(FileSystem fs, Path path, int partitionNum) + public HdfsFilePartitioner(FileSystem fs, Path path, int partitionCount) { this.fs = fs; this.path = path; - this.partitionNum = partitionNum; + this.partitionCount = partitionCount; } public List getHdfsPartialFiles() throws IOException { List hdfsPartialFiles = new ArrayList<>(); long size = fs.getFileStatus(path).getLen(); - for (int i = 0; i < partitionNum; i++) { - long start = size * i / partitionNum; - long end = size * (i + 1) / partitionNum; + for (int i = 0; i < partitionCount; i++) { + long start = size * i / partitionCount; + long end = size * (i + 1) / partitionCount; if (start < end) { hdfsPartialFiles.add(new HdfsPartialFile(path.toString(), start, end)); } From 14a7b76d3d9906172a5989d378c45303cb6b4d22 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Tue, 8 Sep 2015 10:03:25 +0900 Subject: [PATCH 12/13] parameterize partitionCountParameter as "partition_level". --- .../java/org/embulk/input/hdfs/HdfsFileInputPlugin.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/embulk/input/hdfs/HdfsFileInputPlugin.java b/src/main/java/org/embulk/input/hdfs/HdfsFileInputPlugin.java index 0ee8f5c..80b1ff7 100644 --- a/src/main/java/org/embulk/input/hdfs/HdfsFileInputPlugin.java +++ b/src/main/java/org/embulk/input/hdfs/HdfsFileInputPlugin.java @@ -55,6 +55,11 @@ public interface PluginTask extends Task @ConfigDefault("true") public boolean getPartition(); + // this parameter is experimental. + @Config("partition_level") + @ConfigDefault("3") + public int getPartitonLevel(); + public List getFiles(); public void setFiles(List hdfsFiles); @@ -230,7 +235,7 @@ public Path apply(@Nullable String input) } // TODO: optimum allocation of resources - int partitionCountParameter = 3; + int partitionCountParameter = task.getPartitonLevel(); int partitionSizeByOneTask = totalFileLength / (Runtime.getRuntime().availableProcessors() * partitionCountParameter); List hdfsPartialFiles = new ArrayList<>(); From 519576c06f36474c0cfab4ff9469f6188358a1ab Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Tue, 8 Sep 2015 10:16:55 +0900 Subject: [PATCH 13/13] update README --- README.md | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index ed711a1..058a752 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,7 @@ Read files on Hdfs. - **config** overwrites configuration parameters (hash, default: `{}`) - **input_path** file path on Hdfs. you can use glob and Date format like `%Y%m%d/%s`. - **rewind_seconds** When you use Date format in input_path property, the format is executed by using the time which is Now minus this property. +- **partition** when this is true, partition input files and increase task count. (default: `true`) ## Example @@ -24,12 +25,13 @@ in: - /opt/analytics/etc/hadoop/conf/core-site.xml - /opt/analytics/etc/hadoop/conf/hdfs-site.xml config: - fs.defaultFS: 'hdfs://hdp-nn1:8020' + fs.defaultFS: 'hdfs://hadoop-nn1:8020' dfs.replication: 1 fs.hdfs.impl: 'org.apache.hadoop.hdfs.DistributedFileSystem' fs.file.impl: 'org.apache.hadoop.fs.LocalFileSystem' input_path: /user/embulk/test/%Y-%m-%d/* rewind_seconds: 86400 + partition: true decoders: - {type: gzip} parser: @@ -50,6 +52,15 @@ in: - {name: c3, type: long} ``` +## Note +- the feature of the partition supports only 3 line terminators. + - `\n` + - `\r` + - `\r\n` + +## The Reference Implementation +- [hito4t/embulk-input-filesplit](https://github.com/hito4t/embulk-input-filesplit) + ## Build ```