25
25
import java .util .List ;
26
26
import java .util .Locale ;
27
27
28
+ import com .amazonaws .Protocol ;
29
+ import com .amazonaws .auth .AWSCredentialsProvider ;
30
+ import com .amazonaws .client .builder .AwsClientBuilder ;
31
+ import com .amazonaws .retry .PredefinedRetryPolicies ;
32
+ import com .amazonaws .services .s3 .AmazonS3 ;
33
+ import com .amazonaws .services .s3 .AmazonS3ClientBuilder ;
28
34
import org .embulk .util .config .Config ;
29
35
import org .embulk .util .config .ConfigDefault ;
30
36
import org .embulk .config .ConfigDiff ;
44
50
import org .slf4j .Logger ;
45
51
46
52
import com .amazonaws .ClientConfiguration ;
47
- import com .amazonaws .auth .BasicAWSCredentials ;
48
- import com .amazonaws .services .s3 .AmazonS3Client ;
49
53
import com .amazonaws .services .s3 .model .CannedAccessControlList ;
50
54
import com .amazonaws .services .s3 .model .PutObjectRequest ;
55
+ import org .embulk .util .aws .credentials .AwsCredentials ;
56
+ import org .embulk .util .aws .credentials .AwsCredentialsTask ;
51
57
import org .slf4j .LoggerFactory ;
52
58
53
59
import java .util .Optional ;
@@ -63,7 +69,7 @@ public class S3FileOutputPlugin
63
69
private static final ConfigMapper CONFIG_MAPPER = CONFIG_MAPPER_FACTORY .createConfigMapper ();
64
70
65
71
public interface PluginTask
66
- extends Task
72
+ extends AwsCredentialsTask , Task
67
73
{
68
74
@ Config ("path_prefix" )
69
75
String getPathPrefix ();
@@ -82,13 +88,10 @@ public interface PluginTask
82
88
@ ConfigDefault ("null" )
83
89
Optional <String > getEndpoint ();
84
90
85
- @ Config ("access_key_id " )
91
+ @ Config ("http_proxy " )
86
92
@ ConfigDefault ("null" )
87
- Optional <String > getAccessKeyId ();
88
-
89
- @ Config ("secret_access_key" )
90
- @ ConfigDefault ("null" )
91
- Optional <String > getSecretAccessKey ();
93
+ Optional <HttpProxy > getHttpProxy ();
94
+ void setHttpProxy (Optional <HttpProxy > httpProxy );
92
95
93
96
@ Config ("proxy_host" )
94
97
@ ConfigDefault ("null" )
@@ -109,6 +112,10 @@ public interface PluginTask
109
112
@ Config ("canned_acl" )
110
113
@ ConfigDefault ("null" )
111
114
Optional <CannedAccessControlList > getCannedAccessControlList ();
115
+
116
+ @ Config ("region" )
117
+ @ ConfigDefault ("null" )
118
+ Optional <String > getRegion ();
112
119
}
113
120
114
121
public static class S3FileOutput
@@ -124,42 +131,122 @@ public static class S3FileOutput
124
131
125
132
private int taskIndex ;
126
133
private int fileIndex ;
127
- private AmazonS3Client client ;
134
+ private AmazonS3 client ;
128
135
private OutputStream current ;
129
136
private Path tempFilePath ;
130
137
private String tempPath = null ;
131
138
132
- private static AmazonS3Client newS3Client (PluginTask task )
139
+ private AmazonS3 newS3Client (final PluginTask task )
140
+ {
141
+ Optional <String > endpoint = task .getEndpoint ();
142
+ Optional <String > region = task .getRegion ();
143
+
144
+ final AmazonS3ClientBuilder builder = AmazonS3ClientBuilder
145
+ .standard ()
146
+ .withCredentials (getCredentialsProvider (task ))
147
+ .withClientConfiguration (getClientConfiguration (task ));
148
+
149
+ // Favor the `endpoint` configuration, then `region`, if both are absent then `s3.amazonaws.com` will be used.
150
+ if (endpoint .isPresent ()) {
151
+ if (region .isPresent ()) {
152
+ logger .warn ("Either configure endpoint or region, " +
153
+ "if both is specified only the endpoint will be in effect." );
154
+ }
155
+ builder .setEndpointConfiguration (new AwsClientBuilder .EndpointConfiguration (endpoint .get (), null ));
156
+ }
157
+ else if (region .isPresent ()) {
158
+ builder .setRegion (region .get ());
159
+ }
160
+ else {
161
+ // This is to keep the AWS SDK upgrading to 1.11.x to be backward compatible with old configuration.
162
+ //
163
+ // On SDK 1.10.x, when neither endpoint nor region is set explicitly, the client's endpoint will be by
164
+ // default `s3.amazonaws.com`. And for pre-Signature-V4, this will work fine as the bucket's region
165
+ // will be resolved to the appropriate region on server (AWS) side.
166
+ //
167
+ // On SDK 1.11.x, a region will be computed on client side by AwsRegionProvider and the endpoint now will
168
+ // be region-specific `<region>.s3.amazonaws.com` and might be the wrong one.
169
+ //
170
+ // So a default endpoint of `s3.amazonaws.com` when both endpoint and region configs are absent are
171
+ // necessary to make old configurations won't suddenly break. The side effect is that this will render
172
+ // AwsRegionProvider useless. And it's worth to note that Signature-V4 won't work with either versions with
173
+ // no explicit region or endpoint as the region (inferrable from endpoint) are necessary for signing.
174
+ builder .setEndpointConfiguration (new AwsClientBuilder .EndpointConfiguration ("s3.amazonaws.com" , null ));
175
+ }
176
+
177
+ builder .withForceGlobalBucketAccessEnabled (true );
178
+ return builder .build ();
179
+ }
180
+
181
+ private AWSCredentialsProvider getCredentialsProvider (PluginTask task )
133
182
{
134
- AmazonS3Client client ;
183
+ return AwsCredentials .getAWSCredentialsProvider (task );
184
+ }
185
+
186
+ private ClientConfiguration getClientConfiguration (PluginTask task )
187
+ {
188
+ ClientConfiguration clientConfig = new ClientConfiguration ();
135
189
136
- // TODO: Support more configurations.
137
- ClientConfiguration config = new ClientConfiguration ();
190
+ clientConfig .setMaxConnections (50 ); // SDK default: 50
191
+ clientConfig .setSocketTimeout (8 * 60 * 1000 ); // SDK default: 50*1000
192
+ clientConfig .setRetryPolicy (PredefinedRetryPolicies .NO_RETRY_POLICY );
138
193
194
+ // set http proxy
195
+ // backward compatibility
139
196
if (task .getProxyHost ().isPresent ()) {
140
- config .setProxyHost (task .getProxyHost ().get ());
197
+ logger .warn ("Configuration with \" proxy_host\" is deprecated. Use \" http_proxy.host\" instead." );
198
+ if (!task .getHttpProxy ().isPresent ()) {
199
+ ConfigMapper configMapper = CONFIG_MAPPER_FACTORY .createConfigMapper ();
200
+ ConfigSource configSource = CONFIG_MAPPER_FACTORY .newConfigSource ();
201
+ configSource .set ("host" , task .getProxyHost ().get ());
202
+ HttpProxy httpProxy = configMapper .map (configSource , HttpProxy .class );
203
+ task .setHttpProxy (Optional .of (httpProxy ));
204
+ } else {
205
+ HttpProxy httpProxy = task .getHttpProxy ().get ();
206
+ if (httpProxy .getHost ().isEmpty ()) {
207
+ httpProxy .setHost (task .getProxyHost ().get ());
208
+ task .setHttpProxy (Optional .of (httpProxy ));
209
+ }
210
+ }
141
211
}
142
212
143
213
if (task .getProxyPort ().isPresent ()) {
144
- config .setProxyPort (task .getProxyPort ().get ());
214
+ logger .warn ("Configuration with \" proxy_port\" is deprecated. Use \" http_proxy.port\" instead." );
215
+ HttpProxy httpProxy = task .getHttpProxy ().get ();
216
+ if (!httpProxy .getPort ().isPresent ()) {
217
+ httpProxy .setPort (task .getProxyPort ());
218
+ task .setHttpProxy (Optional .of (httpProxy ));
219
+ }
145
220
}
146
221
147
- if (task .getAccessKeyId ().isPresent ()) {
148
- BasicAWSCredentials basicAWSCredentials = new BasicAWSCredentials (
149
- task .getAccessKeyId ().get (), task .getSecretAccessKey ().get ());
150
-
151
- client = new AmazonS3Client (basicAWSCredentials , config );
222
+ if (task .getHttpProxy ().isPresent ()) {
223
+ setHttpProxyInAwsClient (clientConfig , task .getHttpProxy ().get ());
152
224
}
153
- else {
154
- // Use default credential provider chain.
155
- client = new AmazonS3Client (config );
225
+
226
+ return clientConfig ;
227
+ }
228
+
229
+ private void setHttpProxyInAwsClient (ClientConfiguration clientConfig , HttpProxy httpProxy ) {
230
+ // host
231
+ clientConfig .setProxyHost (httpProxy .getHost ());
232
+
233
+ // port
234
+ if (httpProxy .getPort ().isPresent ()) {
235
+ clientConfig .setProxyPort (httpProxy .getPort ().get ());
156
236
}
157
237
158
- if (task .getEndpoint ().isPresent ()) {
159
- client .setEndpoint (task .getEndpoint ().get ());
238
+ // https
239
+ clientConfig .setProtocol (httpProxy .getHttps () ? Protocol .HTTPS : Protocol .HTTP );
240
+
241
+ // user
242
+ if (httpProxy .getUser ().isPresent ()) {
243
+ clientConfig .setProxyUsername (httpProxy .getUser ().get ());
160
244
}
161
245
162
- return client ;
246
+ // password
247
+ if (httpProxy .getPassword ().isPresent ()) {
248
+ clientConfig .setProxyPassword (httpProxy .getPassword ().get ());
249
+ }
163
250
}
164
251
165
252
public S3FileOutput (PluginTask task , int taskIndex )
0 commit comments