Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
- **sequence_format**: format for sequence part of target keys (string, default: '.%03d.%02d')
- **bucket**: S3 bucket name (string, required)
- **endpoint**: S3 endpoint login user name (string, optional)
- **access_key_id**: AWS access key id (string, required)
- **secret_access_key**: AWS secret key (string, required)
- **access_key_id**: AWS access key id. This parameter is required when your agent is not running on EC2 instance with an IAM Role. (string, defualt: null)
- **secret_access_key**: AWS secret key. This parameter is required when your agent is not running on EC2 instance with an IAM Role. (string, defualt: null)
- **tmp_path_prefix**: prefix of temporary files (string, defualt: 'embulk-output-s3-')

## Example
Expand Down
55 changes: 28 additions & 27 deletions src/main/java/org/embulk/output/S3FileOutputPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import org.slf4j.Logger;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.google.common.base.Optional;

public class S3FileOutputPlugin implements FileOutputPlugin {
public interface PluginTask extends Task {
Expand All @@ -49,10 +49,12 @@ public interface PluginTask extends Task {
public String getEndpoint();

@Config("access_key_id")
public String getAccessKeyId();
@ConfigDefault("null")
public Optional<String> getAccessKeyId();

@Config("secret_access_key")
public String getSecretAccessKey();
@ConfigDefault("null")
public Optional<String> getSecretAccessKey();

@Config("tmp_path_prefix")
@ConfigDefault("\"embulk-output-s3-\"")
Expand All @@ -75,30 +77,29 @@ public static class S3FileOutput implements FileOutput,
private OutputStream current;
private Path tempFilePath;

public static AWSCredentialsProvider getCredentialsProvider(
PluginTask task) {
final AWSCredentials cred = new BasicAWSCredentials(
task.getAccessKeyId(), task.getSecretAccessKey());
return new AWSCredentialsProvider() {
@Override
public AWSCredentials getCredentials() {
return cred;
}

@Override
public void refresh() {
}
};
}

private static AmazonS3Client newS3Client(PluginTask task) {
AWSCredentialsProvider credentials = getCredentialsProvider(task);

ClientConfiguration config = new ClientConfiguration();
// TODO: Support more configurations.

AmazonS3Client client = new AmazonS3Client(credentials, config);
client.setEndpoint(task.getEndpoint());
AmazonS3Client client = null;
try {
if (task.getAccessKeyId().isPresent()) {
BasicAWSCredentials basicAWSCredentials = new BasicAWSCredentials(
task.getAccessKeyId().get(), task.getSecretAccessKey().get());

ClientConfiguration config = new ClientConfiguration();
// TODO: Support more configurations.

client = new AmazonS3Client(basicAWSCredentials, config);
} else {
if (System.getenv("AWS_ACCESS_KEY_ID") == null) {
client = new AmazonS3Client(new EnvironmentVariableCredentialsProvider());
} else { // IAM ROLE
client = new AmazonS3Client();
}
}
client.setEndpoint(task.getEndpoint());
client.isRequesterPaysEnabled(task.getBucket()); // check s3 access.
} catch (Exception e) {
throw new RuntimeException("can't call S3 API. Please check your access_key_id / secret_access_key or s3_region configuration.", e);
}

return client;
}
Expand Down