2424import org .slf4j .Logger ;
2525
2626import com .amazonaws .ClientConfiguration ;
27- import com .amazonaws .auth .AWSCredentials ;
28- import com .amazonaws .auth .AWSCredentialsProvider ;
2927import com .amazonaws .auth .BasicAWSCredentials ;
28+ import com .amazonaws .auth .EnvironmentVariableCredentialsProvider ;
3029import com .amazonaws .services .s3 .AmazonS3Client ;
3130import com .amazonaws .services .s3 .model .PutObjectRequest ;
31+ import com .google .common .base .Optional ;
3232
3333public class S3FileOutputPlugin implements FileOutputPlugin {
3434 public interface PluginTask extends Task {
@@ -49,10 +49,12 @@ public interface PluginTask extends Task {
4949 public String getEndpoint ();
5050
5151 @ Config ("access_key_id" )
52- public String getAccessKeyId ();
52+ @ ConfigDefault ("null" )
53+ public Optional <String > getAccessKeyId ();
5354
5455 @ Config ("secret_access_key" )
55- public String getSecretAccessKey ();
56+ @ ConfigDefault ("null" )
57+ public Optional <String > getSecretAccessKey ();
5658
5759 @ Config ("tmp_path_prefix" )
5860 @ ConfigDefault ("\" embulk-output-s3-\" " )
@@ -75,30 +77,29 @@ public static class S3FileOutput implements FileOutput,
7577 private OutputStream current ;
7678 private Path tempFilePath ;
7779
78- public static AWSCredentialsProvider getCredentialsProvider (
79- PluginTask task ) {
80- final AWSCredentials cred = new BasicAWSCredentials (
81- task .getAccessKeyId (), task .getSecretAccessKey ());
82- return new AWSCredentialsProvider () {
83- @ Override
84- public AWSCredentials getCredentials () {
85- return cred ;
86- }
87-
88- @ Override
89- public void refresh () {
90- }
91- };
92- }
93-
9480 private static AmazonS3Client newS3Client (PluginTask task ) {
95- AWSCredentialsProvider credentials = getCredentialsProvider (task );
96-
97- ClientConfiguration config = new ClientConfiguration ();
98- // TODO: Support more configurations.
99-
100- AmazonS3Client client = new AmazonS3Client (credentials , config );
101- client .setEndpoint (task .getEndpoint ());
81+ AmazonS3Client client = null ;
82+ try {
83+ if (task .getAccessKeyId ().isPresent ()) {
84+ BasicAWSCredentials basicAWSCredentials = new BasicAWSCredentials (
85+ task .getAccessKeyId ().get (), task .getSecretAccessKey ().get ());
86+
87+ ClientConfiguration config = new ClientConfiguration ();
88+ // TODO: Support more configurations.
89+
90+ client = new AmazonS3Client (basicAWSCredentials , config );
91+ } else {
92+ if (System .getenv ("AWS_ACCESS_KEY_ID" ) == null ) {
93+ client = new AmazonS3Client (new EnvironmentVariableCredentialsProvider ());
94+ } else { // IAM ROLE
95+ client = new AmazonS3Client ();
96+ }
97+ }
98+ client .setEndpoint (task .getEndpoint ());
99+ client .isRequesterPaysEnabled (task .getBucket ()); // check s3 access.
100+ } catch (Exception e ) {
101+ throw new RuntimeException ("can't call S3 API. Please check your access_key_id / secret_access_key or s3_region configuration." , e );
102+ }
102103
103104 return client ;
104105 }
0 commit comments