Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ build/
.idea
*.iml
example/config.yml
.vagrant
13 changes: 12 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,21 @@ out:
```

## Run Example
replace settings in `example/sample.yml` before running.

```
$ ./gradlew classpath
```

Use `vagrant` to start a remote sshd server:

```
$ vagrant up
```

Run:


```
$ embulk run -Ilib example/sample.yml
```

Expand Down
20 changes: 20 additions & 0 deletions Vagrantfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# -*- mode: ruby -*-
# vi: set ft=ruby :

# Vagrantfile API/syntax version. Don't touch unless you know what you're doing!
VAGRANTFILE_API_VERSION = "2"

Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
# All Vagrant configuration is done here. The most common configuration
# options are documented and commented below. For a complete reference,
# please see the online documentation at vagrantup.com.

# Every Vagrant virtual environment requires a box to build off of.
config.vm.box = "centos6.5.3"
config.vm.box_url = 'https://github.com/2creatives/vagrant-centos/releases/download/v6.5.3/centos65-x86_64-20140116.box'

# name
config.vm.define "vagrant-centos"
config.ssh.forward_agent = true
config.vm.network "forwarded_port", guest: 22, host: 2210
end
13 changes: 7 additions & 6 deletions example/sample.yml → example/sample.yml.liquid
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ in:
out:
type: sftp
host: 127.0.0.1
port: 22
user: your_name
password: your_password
secret_key_file: your_secret_key_file
secret_key_passphrase: your_secret_key_passphrase
port: 2210
user: vagrant
# password:
secret_key_file: {{ env.PWD }}/.vagrant/machines/vagrant-centos/virtualbox/private_key
# secret_key_passphrase:
user_directory_is_root: true
path_prefix: /tmp/embulk_output_sftp/data
file_ext: .tsv
Expand All @@ -36,4 +36,5 @@ out:
quote: "\""
escape: "\\"
null_string: ""
default_timezone: 'UTC'
default_timezone: 'UTC'

107 changes: 83 additions & 24 deletions src/main/java/org/embulk/output/sftp/SftpFileOutput.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.embulk.spi.unit.LocalFile;
import org.slf4j.Logger;

import java.lang.Void;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
Expand Down Expand Up @@ -144,7 +145,7 @@ public void nextFile()

try {
currentFile = newSftpFile(getSftpFileUri(getOutputFilePath()));
currentFileOutputStream = currentFile.getContent().getOutputStream();
currentFileOutputStream = newSftpOutputStream(currentFile);
logger.info("new sftp file: {}", currentFile.getPublicURIString());
}
catch (FileSystemException e) {
Expand All @@ -154,14 +155,26 @@ public void nextFile()
}

@Override
public void add(Buffer buffer)
public void add(final Buffer buffer)
{
if (currentFile == null) {
throw new IllegalStateException("nextFile() must be called before poll()");
}

try {
currentFileOutputStream.write(buffer.array(), buffer.offset(), buffer.limit());
Retriable<Void> retriable = new Retriable<Void>() {
public Void execute() throws IOException
{
currentFileOutputStream.write(buffer.array(), buffer.offset(), buffer.limit());
return null;
}
};
try {
withConnectionRetry(retriable);
}
catch (Exception e) {
throw (IOException)e;
}
}
catch (IOException e) {
logger.error(e.getMessage());
Expand Down Expand Up @@ -204,18 +217,21 @@ private void closeCurrentFile()

try {
currentFileOutputStream.close();
currentFile.getContent().close();
currentFile.close();
}
catch (IOException e) {
logger.error(e.getMessage());
Throwables.propagate(e);
logger.info(e.getMessage());
}
finally {
fileIndex++;
currentFile = null;
currentFileOutputStream = null;

try {
currentFile.close();
}
catch (FileSystemException e) {
logger.warn(e.getMessage());
}

fileIndex++;
currentFile = null;
currentFileOutputStream = null;
}

private URI getSftpFileUri(String remoteFilePath)
Expand All @@ -234,24 +250,25 @@ private String getOutputFilePath()
return pathPrefix + String.format(sequenceFormat, taskIndex, fileIndex) + fileNameExtension;
}

private FileObject newSftpFile(URI sftpUri)
throws FileSystemException
interface Retriable<T>
{
/**
* Execute the operation with the given (or null) return value.
*
* @return any return value from the operation
* @throws Exception
*/
public T execute() throws Exception;
}

private <T> T withConnectionRetry( final Retriable<T> op ) throws Exception {
int count = 0;
while (true) {
try {
FileObject file = manager.resolveFile(sftpUri.toString(), fsOptions);
if (file.getParent().exists()) {
logger.info("parent directory {} exists there", file.getParent());
return file;
}
else {
logger.info("trying to create parent directory {}", file.getParent());
file.getParent().createFolder();
}
return op.execute();
}
catch (FileSystemException e) {
if (++count == maxConnectionRetry) {
catch(final Exception e) {
if (++count > maxConnectionRetry) {
throw e;
}
logger.warn("failed to connect sftp server: " + e.getMessage(), e);
Expand All @@ -270,6 +287,48 @@ private FileObject newSftpFile(URI sftpUri)
}
}

private FileObject newSftpFile(final URI sftpUri)
throws FileSystemException
{
Retriable<FileObject> retriable = new Retriable<FileObject>() {
public FileObject execute() throws FileSystemException
{
FileObject file = manager.resolveFile(sftpUri.toString(), fsOptions);
if (file.getParent().exists()) {
logger.info("parent directory {} exists there", file.getParent());
}
else {
logger.info("trying to create parent directory {}", file.getParent());
file.getParent().createFolder();
}
return file;
}
};
try {
return withConnectionRetry(retriable);
}
catch (Exception e) {
throw (FileSystemException)e;
}
}

private OutputStream newSftpOutputStream(final FileObject file)
throws FileSystemException
{
Retriable<OutputStream> retriable = new Retriable<OutputStream>() {
public OutputStream execute() throws FileSystemException
{
return file.getContent().getOutputStream();
}
};
try {
return withConnectionRetry(retriable);
}
catch (Exception e) {
throw (FileSystemException)e;
}
}

private Function<LocalFile, String> localFileToPathString()
{
return new Function<LocalFile, String>()
Expand Down