diff --git a/.gitignore b/.gitignore index 3edd668..eab43f9 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,5 @@ build/ .idea *.iml +.ruby-version + diff --git a/README.md b/README.md index ed711a1..058a752 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,7 @@ Read files on Hdfs. - **config** overwrites configuration parameters (hash, default: `{}`) - **input_path** file path on Hdfs. you can use glob and Date format like `%Y%m%d/%s`. - **rewind_seconds** When you use Date format in input_path property, the format is executed by using the time which is Now minus this property. +- **partition** when this is true, partition input files and increase task count. (default: `true`) ## Example @@ -24,12 +25,13 @@ in: - /opt/analytics/etc/hadoop/conf/core-site.xml - /opt/analytics/etc/hadoop/conf/hdfs-site.xml config: - fs.defaultFS: 'hdfs://hdp-nn1:8020' + fs.defaultFS: 'hdfs://hadoop-nn1:8020' dfs.replication: 1 fs.hdfs.impl: 'org.apache.hadoop.hdfs.DistributedFileSystem' fs.file.impl: 'org.apache.hadoop.fs.LocalFileSystem' input_path: /user/embulk/test/%Y-%m-%d/* rewind_seconds: 86400 + partition: true decoders: - {type: gzip} parser: @@ -50,6 +52,15 @@ in: - {name: c3, type: long} ``` +## Note +- the feature of the partition supports only 3 line terminators. + - `\n` + - `\r` + - `\r\n` + +## The Reference Implementation +- [hito4t/embulk-input-filesplit](https://github.com/hito4t/embulk-input-filesplit) + ## Build ``` diff --git a/build.gradle b/build.gradle index a3e52a1..53cc193 100644 --- a/build.gradle +++ b/build.gradle @@ -12,7 +12,7 @@ configurations { provided } -version = "0.0.3" +version = "0.1.0" sourceCompatibility = 1.7 targetCompatibility = 1.7 @@ -22,7 +22,7 @@ dependencies { provided "org.embulk:embulk-core:0.7.0" // compile "YOUR_JAR_DEPENDENCY_GROUP:YOUR_JAR_DEPENDENCY_MODULE:YOUR_JAR_DEPENDENCY_VERSION" compile 'org.apache.hadoop:hadoop-client:2.6.0' - compile 'com.google.guava:guava:14.0' + compile 'com.google.guava:guava:15.0' testCompile "junit:junit:4.+" } diff --git a/lib/embulk/input/hdfs.rb b/lib/embulk/input/hdfs.rb index 237ebfd..e658a75 100644 --- a/lib/embulk/input/hdfs.rb +++ b/lib/embulk/input/hdfs.rb @@ -1,3 +1,3 @@ Embulk::JavaPlugin.register_input( - "hdfs", "org.embulk.input.HdfsFileInputPlugin", + "hdfs", "org.embulk.input.hdfs.HdfsFileInputPlugin", File.expand_path('../../../../classpath', __FILE__)) diff --git a/src/main/java/org/embulk/input/HdfsFileInputPlugin.java b/src/main/java/org/embulk/input/HdfsFileInputPlugin.java deleted file mode 100644 index 0dfff22..0000000 --- a/src/main/java/org/embulk/input/HdfsFileInputPlugin.java +++ /dev/null @@ -1,231 +0,0 @@ -package org.embulk.input; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.embulk.config.*; -import org.embulk.spi.BufferAllocator; -import org.embulk.spi.Exec; -import org.embulk.spi.FileInputPlugin; -import org.embulk.spi.TransactionalFileInput; -import org.embulk.spi.util.InputStreamFileInput; -import org.jruby.embed.ScriptingContainer; -import org.slf4j.Logger; - -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -public class HdfsFileInputPlugin implements FileInputPlugin -{ - private static final Logger logger = Exec.getLogger(HdfsFileInputPlugin.class); - - public interface PluginTask extends Task - { - @Config("config_files") - @ConfigDefault("[]") - public List getConfigFiles(); - - @Config("config") - @ConfigDefault("{}") - public Map getConfig(); - - @Config("input_path") - public String getInputPath(); - - @Config("rewind_seconds") - @ConfigDefault("0") - public int getRewindSeconds(); - - public List getTargetFiles(); - public void setTargetFiles(List targetFiles); - - @ConfigInject - public BufferAllocator getBufferAllocator(); - } - - @Override - public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control control) - { - PluginTask task = config.loadConfig(PluginTask.class); - - // prepare - Configuration configuration = getHdfsConfiguration(task); - FileSystem fs = getFs(configuration); - Path inputPath = new Path(strftime(task.getInputPath(), task.getRewindSeconds())); - - // listing - List targetFiles; - try { - targetFiles = globRecursive(fs, inputPath); - } catch (IOException e) { - logger.error(e.getMessage()); - throw new RuntimeException(e); - } - logger.info("Loading target files: {}", targetFiles); - task.setTargetFiles(targetFiles); - - // number of processors is same with number of targets - int taskCount = targetFiles.size(); - - return resume(task.dump(), taskCount, control); - } - - @Override - public ConfigDiff resume(TaskSource taskSource, - int taskCount, - FileInputPlugin.Control control) - { - control.run(taskSource, taskCount); - return Exec.newConfigDiff(); - } - - @Override - public void cleanup(TaskSource taskSource, - int taskCount, - List successTaskReports) - { - } - - @Override - public TransactionalFileInput open(TaskSource taskSource, int taskIndex) - { - PluginTask task = taskSource.loadTask(PluginTask.class); - - // prepare - Configuration configuration = getHdfsConfiguration(task); - FileSystem fs = getFs(configuration); - - return new HdfsFileInput(task, fs, taskIndex); - } - - private Configuration getHdfsConfiguration(final PluginTask task) - { - Configuration configuration = new Configuration(); - - for (Object configFile : task.getConfigFiles()) { - configuration.addResource(configFile.toString()); - } - configuration.reloadConfiguration(); - - for (Map.Entry entry: task.getConfig().entrySet()) { - configuration.set(entry.getKey(), entry.getValue()); - } - - return configuration; - } - - private FileSystem getFs(final Configuration configuration) - { - try { - FileSystem fs = FileSystem.get(configuration); - return fs; - } - catch (IOException e) { - logger.error(e.getMessage()); - throw new RuntimeException(e); - } - } - - private String strftime(final String raw, final int rewind_seconds) - { - ScriptingContainer jruby = new ScriptingContainer(); - Object resolved = jruby.runScriptlet( - String.format("(Time.now - %s).strftime('%s')", String.valueOf(rewind_seconds), raw)); - return resolved.toString(); - } - - private List globRecursive(final FileSystem fs, final Path hdfsPath) throws IOException - { - List container = new ArrayList(); - for (FileStatus entry : fs.globStatus(hdfsPath)) { - if (entry.isDirectory()) { - container.addAll(listRecursive(fs, entry)); - } - else { - container.add(entry.getPath().toString()); - } - } - return container; - } - - private List listRecursive(final FileSystem fs, FileStatus status) throws IOException { - List container = new ArrayList(); - if (status.isDirectory()) { - for (FileStatus entry : fs.listStatus(status.getPath())) { - container.addAll(listRecursive(fs, entry)); - } - } - else { - container.add(status.getPath().toString()); - } - return container; - } - - - -// private List listUniquify(List stringList) -// { -// Set set = new HashSet(); -// set.addAll(stringList); -// List uniqueStringList = new ArrayList(); -// uniqueStringList.addAll(set); -// return uniqueStringList; -// } - - public static class HdfsFileInput extends InputStreamFileInput implements TransactionalFileInput - { - private static class HdfsFileProvider implements InputStreamFileInput.Provider - { - private final FileSystem fs; - private final Path hdfsPath; - private boolean opened = false; - - public HdfsFileProvider(PluginTask task, FileSystem fs, int taskIndex) - { - this.fs = fs; - this.hdfsPath = new Path(task.getTargetFiles().get(taskIndex)); - } - - @Override - public InputStream openNext() throws IOException - { - if (opened) { - return null; - } - - opened = true; - return fs.open(hdfsPath); - } - - @Override - public void close() - { - } - } - - public HdfsFileInput(PluginTask task, FileSystem fs, int taskIndex) - { - super(task.getBufferAllocator(), new HdfsFileProvider(task, fs, taskIndex)); - } - - @Override - public void close() - { - } - - @Override - public void abort() - { - } - - @Override - public TaskReport commit() - { - return Exec.newTaskReport(); - } - } -} diff --git a/src/main/java/org/embulk/input/hdfs/HdfsFileInputPlugin.java b/src/main/java/org/embulk/input/hdfs/HdfsFileInputPlugin.java new file mode 100644 index 0000000..80b1ff7 --- /dev/null +++ b/src/main/java/org/embulk/input/hdfs/HdfsFileInputPlugin.java @@ -0,0 +1,267 @@ +package org.embulk.input.hdfs; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.ArrayList; +import java.util.Map; + +import com.google.common.base.Function; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.embulk.config.TaskReport; +import org.embulk.config.Config; +import org.embulk.config.ConfigDefault; +import org.embulk.config.ConfigInject; +import org.embulk.config.ConfigDiff; +import org.embulk.config.ConfigSource; +import org.embulk.config.Task; +import org.embulk.config.TaskSource; +import org.embulk.spi.*; +import org.embulk.spi.util.InputStreamFileInput; +import org.embulk.spi.util.InputStreamTransactionalFileInput; +import org.jruby.embed.ScriptingContainer; +import org.slf4j.Logger; + +import javax.annotation.Nullable; + +public class HdfsFileInputPlugin implements FileInputPlugin +{ + private static final Logger logger = Exec.getLogger(HdfsFileInputPlugin.class); + + public interface PluginTask extends Task + { + @Config("config_files") + @ConfigDefault("[]") + public List getConfigFiles(); + + @Config("config") + @ConfigDefault("{}") + public Map getConfig(); + + @Config("input_path") + public String getInputPath(); + + @Config("rewind_seconds") + @ConfigDefault("0") + public int getRewindSeconds(); + + @Config("partition") + @ConfigDefault("true") + public boolean getPartition(); + + // this parameter is experimental. + @Config("partition_level") + @ConfigDefault("3") + public int getPartitonLevel(); + + public List getFiles(); + public void setFiles(List hdfsFiles); + + @ConfigInject + public BufferAllocator getBufferAllocator(); + } + + @Override + public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control control) + { + PluginTask task = config.loadConfig(PluginTask.class); + + // listing Files + String pathString = strftime(task.getInputPath(), task.getRewindSeconds()); + try { + List originalFileList = buildFileList(getFs(task), pathString); + task.setFiles(allocateHdfsFilesToTasks(task, getFs(task), originalFileList)); + logger.info("Loading target files: {}", originalFileList); + } + catch (IOException e) { + logger.error(e.getMessage()); + throw new RuntimeException(e); + } + + // log the detail of partial files. + for (HdfsPartialFile partialFile : task.getFiles()) { + logger.info("target file: {}, start: {}, end: {}", + partialFile.getPath(), partialFile.getStart(), partialFile.getEnd()); + } + + // number of processors is same with number of targets + int taskCount = task.getFiles().size(); + logger.info("task size: {}", taskCount); + + return resume(task.dump(), taskCount, control); + } + + @Override + public ConfigDiff resume(TaskSource taskSource, + int taskCount, + FileInputPlugin.Control control) + { + control.run(taskSource, taskCount); + + ConfigDiff configDiff = Exec.newConfigDiff(); + + // usually, yo use last_path + //if (task.getFiles().isEmpty()) { + // if (task.getLastPath().isPresent()) { + // configDiff.set("last_path", task.getLastPath().get()); + // } + //} else { + // List files = new ArrayList(task.getFiles()); + // Collections.sort(files); + // configDiff.set("last_path", files.get(files.size() - 1)); + //} + + return configDiff; + } + + @Override + public void cleanup(TaskSource taskSource, + int taskCount, + List successTaskReports) + { + } + + @Override + public TransactionalFileInput open(TaskSource taskSource, int taskIndex) + { + final PluginTask task = taskSource.loadTask(PluginTask.class); + + InputStream input; + try { + input = openInputStream(task, task.getFiles().get(taskIndex)); + } + catch (IOException e) { + logger.error(e.getMessage()); + throw new RuntimeException(e); + } + + return new InputStreamTransactionalFileInput(task.getBufferAllocator(), input) { + @Override + public void abort() + { } + + @Override + public TaskReport commit() + { + return Exec.newTaskReport(); + } + }; + } + + private static HdfsPartialFileInputStream openInputStream(PluginTask task, HdfsPartialFile partialFile) + throws IOException + { + FileSystem fs = getFs(task); + InputStream original = fs.open(new Path(partialFile.getPath())); + return new HdfsPartialFileInputStream(original, partialFile.getStart(), partialFile.getEnd()); + } + + private static FileSystem getFs(final PluginTask task) + throws IOException + { + Configuration configuration = new Configuration(); + + for (Object configFile : task.getConfigFiles()) { + configuration.addResource(configFile.toString()); + } + configuration.reloadConfiguration(); + + for (Map.Entry entry: task.getConfig().entrySet()) { + configuration.set(entry.getKey(), entry.getValue()); + } + + return FileSystem.get(configuration); + } + + private String strftime(final String raw, final int rewind_seconds) + { + ScriptingContainer jruby = new ScriptingContainer(); + Object resolved = jruby.runScriptlet( + String.format("(Time.now - %s).strftime('%s')", String.valueOf(rewind_seconds), raw)); + return resolved.toString(); + } + + private List buildFileList(final FileSystem fs, final String pathString) + throws IOException + { + List fileList = new ArrayList<>(); + for (FileStatus entry : fs.globStatus(new Path(pathString))) { + if (entry.isDirectory()) { + fileList.addAll(lsr(fs, entry)); + } else { + fileList.add(entry.getPath().toString()); + } + } + return fileList; + } + + private List lsr(final FileSystem fs, FileStatus status) + throws IOException + { + List fileList = new ArrayList<>(); + if (status.isDirectory()) { + for (FileStatus entry : fs.listStatus(status.getPath())) { + fileList.addAll(lsr(fs, entry)); + } + } + else { + fileList.add(status.getPath().toString()); + } + return fileList; + } + + private List allocateHdfsFilesToTasks(final PluginTask task, final FileSystem fs, final List fileList) + throws IOException + { + List pathList = Lists.transform(fileList, new Function() + { + @Nullable + @Override + public Path apply(@Nullable String input) + { + return new Path(input); + } + }); + + int totalFileLength = 0; + for (Path path : pathList) { + totalFileLength += fs.getFileStatus(path).getLen(); + } + + // TODO: optimum allocation of resources + int partitionCountParameter = task.getPartitonLevel(); + int partitionSizeByOneTask = totalFileLength / (Runtime.getRuntime().availableProcessors() * partitionCountParameter); + + List hdfsPartialFiles = new ArrayList<>(); + for (Path path : pathList) { + int partitionCount; + + if (path.toString().endsWith(".gz") || path.toString().endsWith(".bz2") || path.toString().endsWith(".lzo")) { + partitionCount = 1; + } + else if (!task.getPartition()) { + partitionCount = 1; + } + else { + int fileLength = (int) fs.getFileStatus(path).getLen(); + partitionCount = fileLength / partitionSizeByOneTask; + int remainder = fileLength % partitionSizeByOneTask; + + if (remainder > 0) { + partitionCount++; + } + } + + HdfsFilePartitioner partitioner = new HdfsFilePartitioner(fs, path, partitionCount); + hdfsPartialFiles.addAll(partitioner.getHdfsPartialFiles()); + } + + return hdfsPartialFiles; + } +} diff --git a/src/main/java/org/embulk/input/hdfs/HdfsFilePartitioner.java b/src/main/java/org/embulk/input/hdfs/HdfsFilePartitioner.java new file mode 100644 index 0000000..ce6c36d --- /dev/null +++ b/src/main/java/org/embulk/input/hdfs/HdfsFilePartitioner.java @@ -0,0 +1,39 @@ +package org.embulk.input.hdfs; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Created by takahiro.nakayama on 8/20/15. + */ +public class HdfsFilePartitioner +{ + private FileSystem fs; + private Path path; + private int partitionCount; + + public HdfsFilePartitioner(FileSystem fs, Path path, int partitionCount) + { + this.fs = fs; + this.path = path; + this.partitionCount = partitionCount; + } + + public List getHdfsPartialFiles() throws IOException + { + List hdfsPartialFiles = new ArrayList<>(); + long size = fs.getFileStatus(path).getLen(); + for (int i = 0; i < partitionCount; i++) { + long start = size * i / partitionCount; + long end = size * (i + 1) / partitionCount; + if (start < end) { + hdfsPartialFiles.add(new HdfsPartialFile(path.toString(), start, end)); + } + } + return hdfsPartialFiles; + } +} diff --git a/src/main/java/org/embulk/input/hdfs/HdfsPartialFile.java b/src/main/java/org/embulk/input/hdfs/HdfsPartialFile.java new file mode 100644 index 0000000..f71ef8c --- /dev/null +++ b/src/main/java/org/embulk/input/hdfs/HdfsPartialFile.java @@ -0,0 +1,40 @@ +package org.embulk.input.hdfs; + +import org.apache.hadoop.fs.Path; + +/** + * Created by takahiro.nakayama on 8/20/15. + */ +// ref. https://github.com/hito4t/embulk-input-filesplit/blob/master/src/main/java/org/embulk/input/filesplit/PartialFile.java +public class HdfsPartialFile +{ + private String path; + private long start; + private long end; + + public HdfsPartialFile(String path, long start, long end) + { + this.path = path; + this.start = start; + this.end = end; + } + + // see: http://stackoverflow.com/questions/7625783/jsonmappingexception-no-suitable-constructor-found-for-type-simple-type-class + public HdfsPartialFile() { } + + public String getPath() + { + return path; + } + + public long getStart() + { + return start; + } + + public long getEnd() + { + return end; + } + +} diff --git a/src/main/java/org/embulk/input/hdfs/HdfsPartialFileInputStream.java b/src/main/java/org/embulk/input/hdfs/HdfsPartialFileInputStream.java new file mode 100644 index 0000000..79e85f9 --- /dev/null +++ b/src/main/java/org/embulk/input/hdfs/HdfsPartialFileInputStream.java @@ -0,0 +1,154 @@ +package org.embulk.input.hdfs; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.PushbackInputStream; + +// ref. https://github.com/hito4t/embulk-input-filesplit/blob/master/src/main/java/org/embulk/input/filesplit/PartialFileInputStream.java +public class HdfsPartialFileInputStream extends InputStream +{ + private final PushbackInputStream original; + private long start; + private long end; + private long current; + private boolean eof; + + public HdfsPartialFileInputStream(InputStream original, long start, long end) + { + this.original = new PushbackInputStream(new BufferedInputStream(original)); + this.start = start; + this.end = end; + current = -1; + } + + @Override + public int read(byte[] b) throws IOException + { + return read(b, 0, b.length); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException + { + initializeIfNeeded(); + + if (eof) { + return -1; + } + + int read = original.read(b, off, len); + if (read < 0) { + eof = true; + return -1; + } + + current += read; + if (current >= end) { + for (int i = Math.max((int)(end - 1 - current + read), 0); i < read; i++) { + if (b[off + i] == '\n') { + eof = true; + return i + 1; + } + + if (b[off + i] == '\r') { + int next = (i < read ? b[off + i + 1] : prefetch()); + if (next != '\n') { + eof = true; + return i + 1; + } + } + } + } + + return read; + } + + @Override + public int read() throws IOException + { + initializeIfNeeded(); + + if (eof) { + return -1; + } + + int read = original.read(); + current++; + + if (read < 0) { + eof = true; + return -1; + } + + if (current >= end) { + if (read == '\n' || read == '\r' && prefetch() != '\n') { + eof = true; + } + } + + return read; + } + + @Override + public long skip(long n) throws IOException + { + throw new IOException("Skip not supported."); + /* + long skip = original.skip(n); + current += skip; + return skip; + */ + } + + @Override + public int available() throws IOException + { + return 0; + } + + @Override + public void close() throws IOException + { + original.close(); + } + + private void initializeIfNeeded() throws IOException + { + if (current >= start) { + return; + + } + if (start == 0) { + current = 0; + } else { + current = original.skip(--start); + if (current != start) { + throw new IOException("Cannot skip."); + } + + int c; + while ((c = original.read()) >= 0) { + start++; + current++; + + if (c == '\n' || c == '\r' && prefetch() != '\n') { + break; + } + } + } + + if (start >= end) { + eof = true; + } + } + + private int prefetch() throws IOException + { + int c = original.read(); + if (c >= 0) { + original.unread(c); + } + return c; + } +} \ No newline at end of file diff --git a/src/test/java/org/embulk/input/TestHdfsFileInputPlugin.java b/src/test/java/org/embulk/input/hdfs/TestHdfsFileInputPlugin.java similarity index 57% rename from src/test/java/org/embulk/input/TestHdfsFileInputPlugin.java rename to src/test/java/org/embulk/input/hdfs/TestHdfsFileInputPlugin.java index 622e938..b8eb7c7 100644 --- a/src/test/java/org/embulk/input/TestHdfsFileInputPlugin.java +++ b/src/test/java/org/embulk/input/hdfs/TestHdfsFileInputPlugin.java @@ -1,4 +1,4 @@ -package org.embulk.input; +package org.embulk.input.hdfs; public class TestHdfsFileInputPlugin {