-
Notifications
You must be signed in to change notification settings - Fork 3.4k
/
s3fs.h
396 lines (326 loc) · 14.5 KB
/
s3fs.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <memory>
#include <string>
#include <vector>
#include "arrow/filesystem/filesystem.h"
#include "arrow/util/macros.h"
#include "arrow/util/uri.h"
namespace Aws {
namespace Auth {
class AWSCredentialsProvider;
class STSAssumeRoleCredentialsProvider;
} // namespace Auth
namespace STS {
class STSClient;
}
} // namespace Aws
namespace arrow {
namespace fs {
/// Options for using a proxy for S3
struct ARROW_EXPORT S3ProxyOptions {
std::string scheme;
std::string host;
int port = -1;
std::string username;
std::string password;
/// Initialize from URI such as http://username:password@host:port
/// or http://host:port
static Result<S3ProxyOptions> FromUri(const std::string& uri);
static Result<S3ProxyOptions> FromUri(const ::arrow::internal::Uri& uri);
bool Equals(const S3ProxyOptions& other) const;
};
enum class S3CredentialsKind : int8_t {
/// Anonymous access (no credentials used)
Anonymous,
/// Use default AWS credentials, configured through environment variables
Default,
/// Use explicitly-provided access key pair
Explicit,
/// Assume role through a role ARN
Role,
/// Use web identity token to assume role, configured through environment variables
WebIdentity
};
/// Pure virtual class for describing custom S3 retry strategies
class ARROW_EXPORT S3RetryStrategy {
public:
virtual ~S3RetryStrategy() = default;
/// Simple struct where each field corresponds to a field in Aws::Client::AWSError
struct AWSErrorDetail {
/// Corresponds to AWSError::GetErrorType()
int error_type;
/// Corresponds to AWSError::GetMessage()
std::string message;
/// Corresponds to AWSError::GetExceptionName()
std::string exception_name;
/// Corresponds to AWSError::ShouldRetry()
bool should_retry;
};
/// Returns true if the S3 request resulting in the provided error should be retried.
virtual bool ShouldRetry(const AWSErrorDetail& error, int64_t attempted_retries) = 0;
/// Returns the time in milliseconds the S3 client should sleep for until retrying.
virtual int64_t CalculateDelayBeforeNextRetry(const AWSErrorDetail& error,
int64_t attempted_retries) = 0;
/// Returns a stock AWS Default retry strategy.
static std::shared_ptr<S3RetryStrategy> GetAwsDefaultRetryStrategy(
int64_t max_attempts);
/// Returns a stock AWS Standard retry strategy.
static std::shared_ptr<S3RetryStrategy> GetAwsStandardRetryStrategy(
int64_t max_attempts);
};
/// Options for the S3FileSystem implementation.
struct ARROW_EXPORT S3Options {
/// \brief AWS region to connect to.
///
/// If unset, the AWS SDK will choose a default value. The exact algorithm
/// depends on the SDK version. Before 1.8, the default is hardcoded
/// to "us-east-1". Since 1.8, several heuristics are used to determine
/// the region (environment variables, configuration profile, EC2 metadata
/// server).
std::string region;
/// \brief Socket connection timeout, in seconds
///
/// If negative, the AWS SDK default value is used (typically 1 second).
double connect_timeout = -1;
/// \brief Socket read timeout on Windows and macOS, in seconds
///
/// If negative, the AWS SDK default value is used (typically 3 seconds).
/// This option is ignored on non-Windows, non-macOS systems.
double request_timeout = -1;
/// If non-empty, override region with a connect string such as "localhost:9000"
// XXX perhaps instead take a URL like "http://localhost:9000"?
std::string endpoint_override;
/// S3 connection transport, default "https"
std::string scheme = "https";
/// ARN of role to assume
std::string role_arn;
/// Optional identifier for an assumed role session.
std::string session_name;
/// Optional external identifier to pass to STS when assuming a role
std::string external_id;
/// Frequency (in seconds) to refresh temporary credentials from assumed role
int load_frequency = 900;
/// If connection is through a proxy, set options here
S3ProxyOptions proxy_options;
/// AWS credentials provider
std::shared_ptr<Aws::Auth::AWSCredentialsProvider> credentials_provider;
/// Type of credentials being used. Set along with credentials_provider.
S3CredentialsKind credentials_kind = S3CredentialsKind::Default;
/// Whether to use virtual addressing of buckets
///
/// If true, then virtual addressing is always enabled.
/// If false, then virtual addressing is only enabled if `endpoint_override` is empty.
///
/// This can be used for non-AWS backends that only support virtual hosted-style access.
bool force_virtual_addressing = false;
/// Whether OutputStream writes will be issued in the background, without blocking.
bool background_writes = true;
/// Whether to allow creation of buckets
///
/// When S3FileSystem creates new buckets, it does not pass any non-default settings.
/// In AWS S3, the bucket and all objects will be not publicly visible, and there
/// will be no bucket policies and no resource tags. To have more control over how
/// buckets are created, use a different API to create them.
bool allow_bucket_creation = false;
/// Whether to allow deletion of buckets
bool allow_bucket_deletion = false;
/// \brief Default metadata for OpenOutputStream.
///
/// This will be ignored if non-empty metadata is passed to OpenOutputStream.
std::shared_ptr<const KeyValueMetadata> default_metadata;
/// Optional retry strategy to determine which error types should be retried, and the
/// delay between retries.
std::shared_ptr<S3RetryStrategy> retry_strategy;
S3Options();
/// Configure with the default AWS credentials provider chain.
void ConfigureDefaultCredentials();
/// Configure with anonymous credentials. This will only let you access public buckets.
void ConfigureAnonymousCredentials();
/// Configure with explicit access and secret key.
void ConfigureAccessKey(const std::string& access_key, const std::string& secret_key,
const std::string& session_token = "");
/// Configure with credentials from an assumed role.
void ConfigureAssumeRoleCredentials(
const std::string& role_arn, const std::string& session_name = "",
const std::string& external_id = "", int load_frequency = 900,
const std::shared_ptr<Aws::STS::STSClient>& stsClient = NULLPTR);
/// Configure with credentials from role assumed using a web identity token
void ConfigureAssumeRoleWithWebIdentityCredentials();
std::string GetAccessKey() const;
std::string GetSecretKey() const;
std::string GetSessionToken() const;
bool Equals(const S3Options& other) const;
/// \brief Initialize with default credentials provider chain
///
/// This is recommended if you use the standard AWS environment variables
/// and/or configuration file.
static S3Options Defaults();
/// \brief Initialize with anonymous credentials.
///
/// This will only let you access public buckets.
static S3Options Anonymous();
/// \brief Initialize with explicit access and secret key.
///
/// Optionally, a session token may also be provided for temporary credentials
/// (from STS).
static S3Options FromAccessKey(const std::string& access_key,
const std::string& secret_key,
const std::string& session_token = "");
/// \brief Initialize from an assumed role.
static S3Options FromAssumeRole(
const std::string& role_arn, const std::string& session_name = "",
const std::string& external_id = "", int load_frequency = 900,
const std::shared_ptr<Aws::STS::STSClient>& stsClient = NULLPTR);
/// \brief Initialize from an assumed role with web-identity.
/// Uses the AWS SDK which uses environment variables to
/// generate temporary credentials.
static S3Options FromAssumeRoleWithWebIdentity();
static Result<S3Options> FromUri(const ::arrow::internal::Uri& uri,
std::string* out_path = NULLPTR);
static Result<S3Options> FromUri(const std::string& uri,
std::string* out_path = NULLPTR);
};
/// S3-backed FileSystem implementation.
///
/// Some implementation notes:
/// - buckets are special and the operations available on them may be limited
/// or more expensive than desired.
class ARROW_EXPORT S3FileSystem : public FileSystem {
public:
~S3FileSystem() override;
std::string type_name() const override { return "s3"; }
/// Return the original S3 options when constructing the filesystem
S3Options options() const;
/// Return the actual region this filesystem connects to
std::string region() const;
bool Equals(const FileSystem& other) const override;
Result<std::string> PathFromUri(const std::string& uri_string) const override;
/// \cond FALSE
using FileSystem::GetFileInfo;
/// \endcond
Result<FileInfo> GetFileInfo(const std::string& path) override;
Result<std::vector<FileInfo>> GetFileInfo(const FileSelector& select) override;
FileInfoGenerator GetFileInfoGenerator(const FileSelector& select) override;
Status CreateDir(const std::string& path, bool recursive = true) override;
Status DeleteDir(const std::string& path) override;
Status DeleteDirContents(const std::string& path, bool missing_dir_ok = false) override;
Future<> DeleteDirContentsAsync(const std::string& path,
bool missing_dir_ok = false) override;
Status DeleteRootDirContents() override;
Status DeleteFile(const std::string& path) override;
Status Move(const std::string& src, const std::string& dest) override;
Status CopyFile(const std::string& src, const std::string& dest) override;
/// Create a sequential input stream for reading from a S3 object.
///
/// NOTE: Reads from the stream will be synchronous and unbuffered.
/// You way want to wrap the stream in a BufferedInputStream or use
/// a custom readahead strategy to avoid idle waits.
Result<std::shared_ptr<io::InputStream>> OpenInputStream(
const std::string& path) override;
/// Create a sequential input stream for reading from a S3 object.
///
/// This override avoids a HEAD request by assuming the FileInfo
/// contains correct information.
Result<std::shared_ptr<io::InputStream>> OpenInputStream(const FileInfo& info) override;
/// Create a random access file for reading from a S3 object.
///
/// See OpenInputStream for performance notes.
Result<std::shared_ptr<io::RandomAccessFile>> OpenInputFile(
const std::string& path) override;
/// Create a random access file for reading from a S3 object.
///
/// This override avoids a HEAD request by assuming the FileInfo
/// contains correct information.
Result<std::shared_ptr<io::RandomAccessFile>> OpenInputFile(
const FileInfo& info) override;
/// Create a sequential output stream for writing to a S3 object.
///
/// NOTE: Writes to the stream will be buffered. Depending on
/// S3Options.background_writes, they can be synchronous or not.
/// It is recommended to enable background_writes unless you prefer
/// implementing your own background execution strategy.
Result<std::shared_ptr<io::OutputStream>> OpenOutputStream(
const std::string& path,
const std::shared_ptr<const KeyValueMetadata>& metadata = {}) override;
Result<std::shared_ptr<io::OutputStream>> OpenAppendStream(
const std::string& path,
const std::shared_ptr<const KeyValueMetadata>& metadata = {}) override;
/// Create a S3FileSystem instance from the given options.
static Result<std::shared_ptr<S3FileSystem>> Make(
const S3Options& options, const io::IOContext& = io::default_io_context());
protected:
explicit S3FileSystem(const S3Options& options, const io::IOContext&);
class Impl;
std::shared_ptr<Impl> impl_;
};
enum class S3LogLevel : int8_t { Off, Fatal, Error, Warn, Info, Debug, Trace };
struct ARROW_EXPORT S3GlobalOptions {
S3LogLevel log_level;
/// The number of threads to configure when creating AWS' I/O event loop
///
/// Defaults to 1 as recommended by AWS' doc when the # of connections is
/// expected to be, at most, in the hundreds
///
/// For more details see Aws::Crt::Io::EventLoopGroup
int num_event_loop_threads = 1;
/// \brief Initialize with default options
///
/// For log_level, this method first tries to extract a suitable value from the
/// environment variable ARROW_S3_LOG_LEVEL.
static S3GlobalOptions Defaults();
};
/// \brief Initialize the S3 APIs with the specified set of options.
///
/// It is required to call this function at least once before using S3FileSystem.
///
/// Once this function is called you MUST call FinalizeS3 before the end of the
/// application in order to avoid a segmentation fault at shutdown.
ARROW_EXPORT
Status InitializeS3(const S3GlobalOptions& options);
/// \brief Ensure the S3 APIs are initialized, but only if not already done.
///
/// If necessary, this will call InitializeS3() with some default options.
ARROW_EXPORT
Status EnsureS3Initialized();
/// Whether S3 was initialized, and not finalized.
ARROW_EXPORT
bool IsS3Initialized();
/// Whether S3 was finalized.
ARROW_EXPORT
bool IsS3Finalized();
/// \brief Shutdown the S3 APIs.
///
/// This can wait for some S3 concurrent calls to finish so as to avoid
/// race conditions.
/// After this function has been called, all S3 calls will fail with an error.
///
/// Calls to InitializeS3() and FinalizeS3() should be serialized by the
/// application (this also applies to EnsureS3Initialized() and
/// EnsureS3Finalized()).
ARROW_EXPORT
Status FinalizeS3();
/// \brief Ensure the S3 APIs are shutdown, but only if not already done.
///
/// If necessary, this will call FinalizeS3().
ARROW_EXPORT
Status EnsureS3Finalized();
ARROW_EXPORT
Result<std::string> ResolveS3BucketRegion(const std::string& bucket);
} // namespace fs
} // namespace arrow