From e390974f3742d927983e476fd952a21008c5d0d8 Mon Sep 17 00:00:00 2001 From: sonots Date: Wed, 16 Mar 2016 15:17:22 +0900 Subject: [PATCH 1/5] Add Vagrantfile --- .gitignore | 1 + README.md | 13 ++++++++++++- Vagrantfile | 20 ++++++++++++++++++++ example/{sample.yml => sample.yml.liquid} | 13 +++++++------ 4 files changed, 40 insertions(+), 7 deletions(-) create mode 100644 Vagrantfile rename example/{sample.yml => sample.yml.liquid} (79%) diff --git a/.gitignore b/.gitignore index ec6db2f..2222ed6 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,4 @@ build/ .idea *.iml example/config.yml +.vagrant diff --git a/README.md b/README.md index 83c74da..974421b 100644 --- a/README.md +++ b/README.md @@ -103,10 +103,21 @@ out: ``` ## Run Example -replace settings in `example/sample.yml` before running. ``` $ ./gradlew classpath +``` + +Use `vagrant` to start a remote sshd server: + +``` +$ vagrant up +``` + +Run: + + +``` $ embulk run -Ilib example/sample.yml ``` diff --git a/Vagrantfile b/Vagrantfile new file mode 100644 index 0000000..14f272b --- /dev/null +++ b/Vagrantfile @@ -0,0 +1,20 @@ +# -*- mode: ruby -*- +# vi: set ft=ruby : + +# Vagrantfile API/syntax version. Don't touch unless you know what you're doing! +VAGRANTFILE_API_VERSION = "2" + +Vagrant.configure(VAGRANTFILE_API_VERSION) do |config| + # All Vagrant configuration is done here. The most common configuration + # options are documented and commented below. For a complete reference, + # please see the online documentation at vagrantup.com. + + # Every Vagrant virtual environment requires a box to build off of. + config.vm.box = "centos6.5.3" + config.vm.box_url = 'https://github.com/2creatives/vagrant-centos/releases/download/v6.5.3/centos65-x86_64-20140116.box' + + # name + config.vm.define "vagrant-centos" + config.ssh.forward_agent = true + config.vm.network "forwarded_port", guest: 22, host: 2210 +end diff --git a/example/sample.yml b/example/sample.yml.liquid similarity index 79% rename from example/sample.yml rename to example/sample.yml.liquid index bee13cc..1d87c82 100644 --- a/example/sample.yml +++ b/example/sample.yml.liquid @@ -16,11 +16,11 @@ in: out: type: sftp host: 127.0.0.1 - port: 22 - user: your_name - password: your_password - secret_key_file: your_secret_key_file - secret_key_passphrase: your_secret_key_passphrase + port: 2210 + user: vagrant + # password: + secret_key_file: {{ env.PWD }}/.vagrant/machines/vagrant-centos/virtualbox/private_key + # secret_key_passphrase: user_directory_is_root: true path_prefix: /tmp/embulk_output_sftp/data file_ext: .tsv @@ -36,4 +36,5 @@ out: quote: "\"" escape: "\\" null_string: "" - default_timezone: 'UTC' \ No newline at end of file + default_timezone: 'UTC' + From dbe3f9c9c339053ae6c38d02aa512a0b4e8950da Mon Sep 17 00:00:00 2001 From: sonots Date: Wed, 16 Mar 2016 15:57:19 +0900 Subject: [PATCH 2/5] retry getOutputStream --- .../embulk/output/sftp/SftpFileOutput.java | 76 +++++++++++++++---- 1 file changed, 60 insertions(+), 16 deletions(-) diff --git a/src/main/java/org/embulk/output/sftp/SftpFileOutput.java b/src/main/java/org/embulk/output/sftp/SftpFileOutput.java index 2acddb1..f071160 100644 --- a/src/main/java/org/embulk/output/sftp/SftpFileOutput.java +++ b/src/main/java/org/embulk/output/sftp/SftpFileOutput.java @@ -144,7 +144,7 @@ public void nextFile() try { currentFile = newSftpFile(getSftpFileUri(getOutputFilePath())); - currentFileOutputStream = currentFile.getContent().getOutputStream(); + currentFileOutputStream = newSftpOutputStream(currentFile); logger.info("new sftp file: {}", currentFile.getPublicURIString()); } catch (FileSystemException e) { @@ -161,7 +161,19 @@ public void add(Buffer buffer) } try { - currentFileOutputStream.write(buffer.array(), buffer.offset(), buffer.limit()); + Retriable retriable = new Retriable() { + public Void execute() throws IOException + { + currentFileOutputStream.write(buffer.array(), buffer.offset(), buffer.limit()); + return null; + } + }; + try { + withConnectionRetry(retriable); + } + catch (Exception e) { + throw (IOException)e; + } } catch (IOException e) { logger.error(e.getMessage()); @@ -234,24 +246,24 @@ private String getOutputFilePath() return pathPrefix + String.format(sequenceFormat, taskIndex, fileIndex) + fileNameExtension; } - private FileObject newSftpFile(URI sftpUri) - throws FileSystemException + interface Retriable { + /** + * Execute the operation with the given (or null) return value. + * + * @return any return value from the operation + * @throws Exception + */ + public T execute() throws FileSystemException; + } + + private T withConnectionRetry( final Retriable op ) throws FileSystemException { int count = 0; while (true) { try { - FileObject file = manager.resolveFile(sftpUri.toString(), fsOptions); - if (file.getParent().exists()) { - logger.info("parent directory {} exists there", file.getParent()); - return file; - } - else { - logger.info("trying to create parent directory {}", file.getParent()); - file.getParent().createFolder(); - } - } - catch (FileSystemException e) { - if (++count == maxConnectionRetry) { + return op.execute(); + } catch(final FileSystemException e) { + if (++count > maxConnectionRetry) { throw e; } logger.warn("failed to connect sftp server: " + e.getMessage(), e); @@ -270,6 +282,38 @@ private FileObject newSftpFile(URI sftpUri) } } + private FileObject newSftpFile(final URI sftpUri) + throws FileSystemException + { + Retriable retriable = new Retriable() { + public FileObject execute() throws FileSystemException + { + FileObject file = manager.resolveFile(sftpUri.toString(), fsOptions); + if (file.getParent().exists()) { + logger.info("parent directory {} exists there", file.getParent()); + } + else { + logger.info("trying to create parent directory {}", file.getParent()); + file.getParent().createFolder(); + } + return file; + } + }; + return withConnectionRetry(retriable); + } + + private OutputStream newSftpOutputStream(final FileObject file) + throws FileSystemException + { + Retriable retriable = new Retriable() { + public OutputStream execute() throws FileSystemException + { + return file.getContent().getOutputStream(); + } + }; + return withConnectionRetry(retriable); + } + private Function localFileToPathString() { return new Function() From 8520263c266171395c71bb83ef14a606b6d0a93d Mon Sep 17 00:00:00 2001 From: sonots Date: Wed, 16 Mar 2016 16:38:48 +0900 Subject: [PATCH 3/5] retry to write --- .../embulk/output/sftp/SftpFileOutput.java | 23 +++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/embulk/output/sftp/SftpFileOutput.java b/src/main/java/org/embulk/output/sftp/SftpFileOutput.java index f071160..56ba69d 100644 --- a/src/main/java/org/embulk/output/sftp/SftpFileOutput.java +++ b/src/main/java/org/embulk/output/sftp/SftpFileOutput.java @@ -17,6 +17,7 @@ import org.embulk.spi.unit.LocalFile; import org.slf4j.Logger; +import java.lang.Void; import java.io.File; import java.io.IOException; import java.io.OutputStream; @@ -154,7 +155,7 @@ public void nextFile() } @Override - public void add(Buffer buffer) + public void add(final Buffer buffer) { if (currentFile == null) { throw new IllegalStateException("nextFile() must be called before poll()"); @@ -171,7 +172,7 @@ public Void execute() throws IOException try { withConnectionRetry(retriable); } - catch (Exception e) { + } catch (Exception e) { throw (IOException)e; } } @@ -254,15 +255,15 @@ interface Retriable * @return any return value from the operation * @throws Exception */ - public T execute() throws FileSystemException; + public T execute() throws Exception; } - private T withConnectionRetry( final Retriable op ) throws FileSystemException { + private T withConnectionRetry( final Retriable op ) throws Exception { int count = 0; while (true) { try { return op.execute(); - } catch(final FileSystemException e) { + } catch(final Exception e) { if (++count > maxConnectionRetry) { throw e; } @@ -299,7 +300,11 @@ public FileObject execute() throws FileSystemException return file; } }; - return withConnectionRetry(retriable); + try { + return withConnectionRetry(retriable); + } catch (Exception e) { + throw (FileSystemException)e; + } } private OutputStream newSftpOutputStream(final FileObject file) @@ -311,7 +316,11 @@ public OutputStream execute() throws FileSystemException return file.getContent().getOutputStream(); } }; - return withConnectionRetry(retriable); + try { + return withConnectionRetry(retriable); + } catch (Exception e) { + throw (FileSystemException)e; + } } private Function localFileToPathString() From 109f9f9b861946703446df928aef270ccdcf8ac4 Mon Sep 17 00:00:00 2001 From: sonots Date: Wed, 16 Mar 2016 16:47:40 +0900 Subject: [PATCH 4/5] fix indent --- .../org/embulk/output/sftp/SftpFileOutput.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/embulk/output/sftp/SftpFileOutput.java b/src/main/java/org/embulk/output/sftp/SftpFileOutput.java index 56ba69d..388b988 100644 --- a/src/main/java/org/embulk/output/sftp/SftpFileOutput.java +++ b/src/main/java/org/embulk/output/sftp/SftpFileOutput.java @@ -172,7 +172,7 @@ public Void execute() throws IOException try { withConnectionRetry(retriable); } - } catch (Exception e) { + catch (Exception e) { throw (IOException)e; } } @@ -220,7 +220,8 @@ private void closeCurrentFile() currentFile.getContent().close(); currentFile.close(); } - catch (IOException e) { + catch (FileSystemException e) { + IOException e) { logger.error(e.getMessage()); Throwables.propagate(e); } @@ -263,7 +264,8 @@ private T withConnectionRetry( final Retriable op ) throws Exception { while (true) { try { return op.execute(); - } catch(final Exception e) { + } + catch(final Exception e) { if (++count > maxConnectionRetry) { throw e; } @@ -302,7 +304,8 @@ public FileObject execute() throws FileSystemException }; try { return withConnectionRetry(retriable); - } catch (Exception e) { + } + catch (Exception e) { throw (FileSystemException)e; } } @@ -318,7 +321,8 @@ public OutputStream execute() throws FileSystemException }; try { return withConnectionRetry(retriable); - } catch (Exception e) { + } + catch (Exception e) { throw (FileSystemException)e; } } From 6a503a93e833927acffe55375853ee8f0dcc6c1e Mon Sep 17 00:00:00 2001 From: sonots Date: Wed, 16 Mar 2016 16:48:49 +0900 Subject: [PATCH 5/5] ignore close error since they would be already closed by InputStream or something --- .../embulk/output/sftp/SftpFileOutput.java | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/src/main/java/org/embulk/output/sftp/SftpFileOutput.java b/src/main/java/org/embulk/output/sftp/SftpFileOutput.java index 388b988..f9ac677 100644 --- a/src/main/java/org/embulk/output/sftp/SftpFileOutput.java +++ b/src/main/java/org/embulk/output/sftp/SftpFileOutput.java @@ -217,19 +217,21 @@ private void closeCurrentFile() try { currentFileOutputStream.close(); - currentFile.getContent().close(); + } + catch (IOException e) { + logger.info(e.getMessage()); + } + + try { currentFile.close(); } catch (FileSystemException e) { - IOException e) { - logger.error(e.getMessage()); - Throwables.propagate(e); - } - finally { - fileIndex++; - currentFile = null; - currentFileOutputStream = null; + logger.warn(e.getMessage()); } + + fileIndex++; + currentFile = null; + currentFileOutputStream = null; } private URI getSftpFileUri(String remoteFilePath)