diff --git a/.gitignore b/.gitignore index ec6db2f..2222ed6 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,4 @@ build/ .idea *.iml example/config.yml +.vagrant diff --git a/README.md b/README.md index 83c74da..974421b 100644 --- a/README.md +++ b/README.md @@ -103,10 +103,21 @@ out: ``` ## Run Example -replace settings in `example/sample.yml` before running. ``` $ ./gradlew classpath +``` + +Use `vagrant` to start a remote sshd server: + +``` +$ vagrant up +``` + +Run: + + +``` $ embulk run -Ilib example/sample.yml ``` diff --git a/Vagrantfile b/Vagrantfile new file mode 100644 index 0000000..14f272b --- /dev/null +++ b/Vagrantfile @@ -0,0 +1,20 @@ +# -*- mode: ruby -*- +# vi: set ft=ruby : + +# Vagrantfile API/syntax version. Don't touch unless you know what you're doing! +VAGRANTFILE_API_VERSION = "2" + +Vagrant.configure(VAGRANTFILE_API_VERSION) do |config| + # All Vagrant configuration is done here. The most common configuration + # options are documented and commented below. For a complete reference, + # please see the online documentation at vagrantup.com. + + # Every Vagrant virtual environment requires a box to build off of. + config.vm.box = "centos6.5.3" + config.vm.box_url = 'https://github.com/2creatives/vagrant-centos/releases/download/v6.5.3/centos65-x86_64-20140116.box' + + # name + config.vm.define "vagrant-centos" + config.ssh.forward_agent = true + config.vm.network "forwarded_port", guest: 22, host: 2210 +end diff --git a/example/sample.yml b/example/sample.yml.liquid similarity index 79% rename from example/sample.yml rename to example/sample.yml.liquid index bee13cc..1d87c82 100644 --- a/example/sample.yml +++ b/example/sample.yml.liquid @@ -16,11 +16,11 @@ in: out: type: sftp host: 127.0.0.1 - port: 22 - user: your_name - password: your_password - secret_key_file: your_secret_key_file - secret_key_passphrase: your_secret_key_passphrase + port: 2210 + user: vagrant + # password: + secret_key_file: {{ env.PWD }}/.vagrant/machines/vagrant-centos/virtualbox/private_key + # secret_key_passphrase: user_directory_is_root: true path_prefix: /tmp/embulk_output_sftp/data file_ext: .tsv @@ -36,4 +36,5 @@ out: quote: "\"" escape: "\\" null_string: "" - default_timezone: 'UTC' \ No newline at end of file + default_timezone: 'UTC' + diff --git a/src/main/java/org/embulk/output/sftp/SftpFileOutput.java b/src/main/java/org/embulk/output/sftp/SftpFileOutput.java index 2acddb1..f9ac677 100644 --- a/src/main/java/org/embulk/output/sftp/SftpFileOutput.java +++ b/src/main/java/org/embulk/output/sftp/SftpFileOutput.java @@ -17,6 +17,7 @@ import org.embulk.spi.unit.LocalFile; import org.slf4j.Logger; +import java.lang.Void; import java.io.File; import java.io.IOException; import java.io.OutputStream; @@ -144,7 +145,7 @@ public void nextFile() try { currentFile = newSftpFile(getSftpFileUri(getOutputFilePath())); - currentFileOutputStream = currentFile.getContent().getOutputStream(); + currentFileOutputStream = newSftpOutputStream(currentFile); logger.info("new sftp file: {}", currentFile.getPublicURIString()); } catch (FileSystemException e) { @@ -154,14 +155,26 @@ public void nextFile() } @Override - public void add(Buffer buffer) + public void add(final Buffer buffer) { if (currentFile == null) { throw new IllegalStateException("nextFile() must be called before poll()"); } try { - currentFileOutputStream.write(buffer.array(), buffer.offset(), buffer.limit()); + Retriable retriable = new Retriable() { + public Void execute() throws IOException + { + currentFileOutputStream.write(buffer.array(), buffer.offset(), buffer.limit()); + return null; + } + }; + try { + withConnectionRetry(retriable); + } + catch (Exception e) { + throw (IOException)e; + } } catch (IOException e) { logger.error(e.getMessage()); @@ -204,18 +217,21 @@ private void closeCurrentFile() try { currentFileOutputStream.close(); - currentFile.getContent().close(); - currentFile.close(); } catch (IOException e) { - logger.error(e.getMessage()); - Throwables.propagate(e); + logger.info(e.getMessage()); } - finally { - fileIndex++; - currentFile = null; - currentFileOutputStream = null; + + try { + currentFile.close(); } + catch (FileSystemException e) { + logger.warn(e.getMessage()); + } + + fileIndex++; + currentFile = null; + currentFileOutputStream = null; } private URI getSftpFileUri(String remoteFilePath) @@ -234,24 +250,25 @@ private String getOutputFilePath() return pathPrefix + String.format(sequenceFormat, taskIndex, fileIndex) + fileNameExtension; } - private FileObject newSftpFile(URI sftpUri) - throws FileSystemException + interface Retriable { + /** + * Execute the operation with the given (or null) return value. + * + * @return any return value from the operation + * @throws Exception + */ + public T execute() throws Exception; + } + + private T withConnectionRetry( final Retriable op ) throws Exception { int count = 0; while (true) { try { - FileObject file = manager.resolveFile(sftpUri.toString(), fsOptions); - if (file.getParent().exists()) { - logger.info("parent directory {} exists there", file.getParent()); - return file; - } - else { - logger.info("trying to create parent directory {}", file.getParent()); - file.getParent().createFolder(); - } + return op.execute(); } - catch (FileSystemException e) { - if (++count == maxConnectionRetry) { + catch(final Exception e) { + if (++count > maxConnectionRetry) { throw e; } logger.warn("failed to connect sftp server: " + e.getMessage(), e); @@ -270,6 +287,48 @@ private FileObject newSftpFile(URI sftpUri) } } + private FileObject newSftpFile(final URI sftpUri) + throws FileSystemException + { + Retriable retriable = new Retriable() { + public FileObject execute() throws FileSystemException + { + FileObject file = manager.resolveFile(sftpUri.toString(), fsOptions); + if (file.getParent().exists()) { + logger.info("parent directory {} exists there", file.getParent()); + } + else { + logger.info("trying to create parent directory {}", file.getParent()); + file.getParent().createFolder(); + } + return file; + } + }; + try { + return withConnectionRetry(retriable); + } + catch (Exception e) { + throw (FileSystemException)e; + } + } + + private OutputStream newSftpOutputStream(final FileObject file) + throws FileSystemException + { + Retriable retriable = new Retriable() { + public OutputStream execute() throws FileSystemException + { + return file.getContent().getOutputStream(); + } + }; + try { + return withConnectionRetry(retriable); + } + catch (Exception e) { + throw (FileSystemException)e; + } + } + private Function localFileToPathString() { return new Function()