Skip to content

Commit

Permalink
GH-40279: [C++] Reduce S3Client initialization time (#40299)
Browse files Browse the repository at this point in the history
### Rationale for this change

By default, S3Client instantiation is extremely slow (around 1ms for every instance). Investigation led to the conclusion that most of this time was spent inside the AWS SDK, parsing a hardcoded piece of JSON data when instantiating a AWS rule engine.

Python benchmarks show this repeated initiatlization cost:
```python
>>> from pyarrow.fs import S3FileSystem

>>> %time s = S3FileSystem()
CPU times: user 21.1 ms, sys: 0 ns, total: 21.1 ms
Wall time: 20.9 ms
>>> %time s = S3FileSystem()
CPU times: user 2.37 ms, sys: 0 ns, total: 2.37 ms
Wall time: 2.18 ms
>>> %time s = S3FileSystem()
CPU times: user 2.42 ms, sys: 0 ns, total: 2.42 ms
Wall time: 2.23 ms

>>> %timeit s = S3FileSystem()
1.28 ms ± 4.03 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
>>> %timeit s = S3FileSystem()
1.28 ms ± 2.6 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
>>> %timeit s = S3FileSystem(anonymous=True)
1.26 ms ± 2.46 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
```

### What changes are included in this PR?

Instead of letting the AWS SDK create a new S3EndpointProvider for each S3Client, arrange to only create a single S3EndpointProvider per set of endpoint configuration options. This lets the 1ms instantiation cost be paid only when a new set of endpoint configuration options is given.

Python benchmarks show the initialization cost has become a one-time cost:
```python
>>> from pyarrow.fs import S3FileSystem

>>> %time s = S3FileSystem()
CPU times: user 20 ms, sys: 0 ns, total: 20 ms
Wall time: 19.8 ms
>>> %time s = S3FileSystem()
CPU times: user 404 µs, sys: 49 µs, total: 453 µs
Wall time: 266 µs
>>> %time s = S3FileSystem()
CPU times: user 361 µs, sys: 42 µs, total: 403 µs
Wall time: 249 µs

>>> %timeit s = S3FileSystem()
50.4 µs ± 227 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)
>>> %timeit s = S3FileSystem(anonymous=True)
33.5 µs ± 306 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)
```

### Are these changes tested?

By existing tests.

### Are there any user-facing changes?

No.
* GitHub Issue: #40279

Authored-by: Antoine Pitrou <antoine@python.org>
Signed-off-by: Antoine Pitrou <antoine@python.org>
  • Loading branch information
pitrou committed Mar 25, 2024
1 parent 1781b32 commit 3095344
Showing 1 changed file with 161 additions and 13 deletions.
174 changes: 161 additions & 13 deletions cpp/src/arrow/filesystem/s3fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,21 @@
#define ARROW_S3_HAS_CRT
#endif

#if ARROW_AWS_SDK_VERSION_CHECK(1, 10, 0)
#define ARROW_S3_HAS_S3CLIENT_CONFIGURATION
#endif

#ifdef ARROW_S3_HAS_CRT
#include <aws/crt/io/Bootstrap.h>
#include <aws/crt/io/EventLoopGroup.h>
#include <aws/crt/io/HostResolver.h>
#endif

#ifdef ARROW_S3_HAS_S3CLIENT_CONFIGURATION
#include <aws/s3/S3ClientConfiguration.h>
#include <aws/s3/S3EndpointProvider.h>
#endif

#include "arrow/util/windows_fixup.h"

#include "arrow/buffer.h"
Expand All @@ -128,19 +137,17 @@
#include "arrow/util/task_group.h"
#include "arrow/util/thread_pool.h"

namespace arrow {

using internal::TaskGroup;
using internal::ToChars;
using io::internal::SubmitIO;
using util::Uri;

namespace fs {
namespace arrow::fs {

using ::Aws::Client::AWSError;
using ::Aws::S3::S3Errors;
namespace S3Model = Aws::S3::Model;

using ::arrow::internal::TaskGroup;
using ::arrow::internal::ToChars;
using ::arrow::io::internal::SubmitIO;
using ::arrow::util::Uri;

using internal::ConnectRetryStrategy;
using internal::DetectS3Backend;
using internal::ErrorToStatus;
Expand Down Expand Up @@ -913,6 +920,134 @@ Result<std::shared_ptr<S3ClientHolder>> GetClientHolder(
// -----------------------------------------------------------------------
// S3 client factory: build S3Client from S3Options

#ifdef ARROW_S3_HAS_S3CLIENT_CONFIGURATION

// GH-40279: standard initialization of S3Client creates a new `S3EndpointProvider`
// every time. Its construction takes 1ms, which makes instantiating every S3Client
// very costly (see upstream bug report
// at https://github.com/aws/aws-sdk-cpp/issues/2880).
// To work around this, we build and cache `S3EndpointProvider` instances
// for each distinct endpoint configuration, and reuse them whenever possible.
// Since most applications tend to use a single endpoint configuration, this
// makes the 1ms setup cost a once-per-process overhead, making it much more
// bearable - if not ideal.

struct EndpointConfigKey {
explicit EndpointConfigKey(const Aws::S3::S3ClientConfiguration& config)
: region(config.region),
scheme(config.scheme),
endpoint_override(config.endpointOverride),
use_virtual_addressing(config.useVirtualAddressing) {}

Aws::String region;
Aws::Http::Scheme scheme;
Aws::String endpoint_override;
bool use_virtual_addressing;

bool operator==(const EndpointConfigKey& other) const noexcept {
return region == other.region && scheme == other.scheme &&
endpoint_override == other.endpoint_override &&
use_virtual_addressing == other.use_virtual_addressing;
}
};

} // namespace
} // namespace arrow::fs

template <>
struct std::hash<arrow::fs::EndpointConfigKey> {
std::size_t operator()(const arrow::fs::EndpointConfigKey& key) const noexcept {
// A crude hash is sufficient since we expect the cache to remain very small.
auto h = std::hash<Aws::String>{};
return h(key.region) ^ h(key.endpoint_override);
}
};

namespace arrow::fs {
namespace {

// EndpointProvider configuration happens in a non-thread-safe way, even
// when the updates are idempotent. This is a problem when trying to reuse
// a single EndpointProvider from several clients.
// To work around this, this class ensures reconfiguration of an existing
// EndpointProvider is a no-op.
class InitOnceEndpointProvider : public Aws::S3::S3EndpointProviderBase {
public:
explicit InitOnceEndpointProvider(
std::shared_ptr<Aws::S3::S3EndpointProviderBase> wrapped)
: wrapped_(std::move(wrapped)) {}

void InitBuiltInParameters(const Aws::S3::S3ClientConfiguration& config) override {}

void OverrideEndpoint(const Aws::String& endpoint) override {
ARROW_LOG(ERROR) << "unexpected call to InitOnceEndpointProvider::OverrideEndpoint";
}
Aws::S3::Endpoint::S3ClientContextParameters& AccessClientContextParameters() override {
ARROW_LOG(ERROR)
<< "unexpected call to InitOnceEndpointProvider::AccessClientContextParameters";
// Need to return a reference to something...
return wrapped_->AccessClientContextParameters();
}

const Aws::S3::Endpoint::S3ClientContextParameters& GetClientContextParameters()
const override {
return wrapped_->GetClientContextParameters();
}
Aws::Endpoint::ResolveEndpointOutcome ResolveEndpoint(
const Aws::Endpoint::EndpointParameters& params) const override {
return wrapped_->ResolveEndpoint(params);
}

protected:
std::shared_ptr<Aws::S3::S3EndpointProviderBase> wrapped_;
};

// A class that instantiates a single EndpointProvider per distinct endpoint
// configuration and initializes it in a thread-safe way. See earlier comments
// for rationale.
class EndpointProviderCache {
public:
std::shared_ptr<Aws::S3::S3EndpointProviderBase> Lookup(
const Aws::S3::S3ClientConfiguration& config) {
auto key = EndpointConfigKey(config);
CacheValue* value;
{
std::unique_lock lock(mutex_);
value = &cache_[std::move(key)];
}
std::call_once(value->once, [&]() {
auto endpoint_provider = std::make_shared<Aws::S3::S3EndpointProvider>();
endpoint_provider->InitBuiltInParameters(config);
value->endpoint_provider =
std::make_shared<InitOnceEndpointProvider>(std::move(endpoint_provider));
});
return value->endpoint_provider;
}

void Reset() {
std::unique_lock lock(mutex_);
cache_.clear();
}

static EndpointProviderCache* Instance() {
static EndpointProviderCache instance;
return &instance;
}

private:
EndpointProviderCache() = default;

struct CacheValue {
std::once_flag once;
std::shared_ptr<Aws::S3::S3EndpointProviderBase> endpoint_provider;
};

std::mutex mutex_;
std::unordered_map<EndpointConfigKey, CacheValue> cache_;
};

#endif // ARROW_S3_HAS_S3CLIENT_CONFIGURATION

class ClientBuilder {
public:
explicit ClientBuilder(S3Options options) : options_(std::move(options)) {}
Expand Down Expand Up @@ -958,9 +1093,6 @@ class ClientBuilder {
client_config_.caPath = ToAwsString(internal::global_options.tls_ca_dir_path);
}

const bool use_virtual_addressing =
options_.endpoint_override.empty() || options_.force_virtual_addressing;

// Set proxy options if provided
if (!options_.proxy_options.scheme.empty()) {
if (options_.proxy_options.scheme == "http") {
Expand Down Expand Up @@ -990,10 +1122,20 @@ class ClientBuilder {
client_config_.maxConnections = std::max(io_context->executor()->GetCapacity(), 25);
}

const bool use_virtual_addressing =
options_.endpoint_override.empty() || options_.force_virtual_addressing;

#ifdef ARROW_S3_HAS_S3CLIENT_CONFIGURATION
client_config_.useVirtualAddressing = use_virtual_addressing;
auto endpoint_provider = EndpointProviderCache::Instance()->Lookup(client_config_);
auto client = std::make_shared<S3Client>(credentials_provider_, endpoint_provider,
client_config_);
#else
auto client = std::make_shared<S3Client>(
credentials_provider_, client_config_,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
use_virtual_addressing);
#endif
client->s3_retry_strategy_ = options_.retry_strategy;
return GetClientHolder(std::move(client));
}
Expand All @@ -1002,7 +1144,11 @@ class ClientBuilder {

protected:
S3Options options_;
#ifdef ARROW_S3_HAS_S3CLIENT_CONFIGURATION
Aws::S3::S3ClientConfiguration client_config_;
#else
Aws::Client::ClientConfiguration client_config_;
#endif
std::shared_ptr<Aws::Auth::AWSCredentialsProvider> credentials_provider_;
};

Expand Down Expand Up @@ -2949,6 +3095,9 @@ struct AwsInstance {
"This could lead to a segmentation fault at exit";
}
GetClientFinalizer()->Finalize();
#ifdef ARROW_S3_HAS_S3CLIENT_CONFIGURATION
EndpointProviderCache::Instance()->Reset();
#endif
Aws::ShutdownAPI(aws_options_);
}
}
Expand Down Expand Up @@ -3090,5 +3239,4 @@ Result<std::string> ResolveS3BucketRegion(const std::string& bucket) {
return resolver->ResolveRegion(bucket);
}

} // namespace fs
} // namespace arrow
} // namespace arrow::fs

0 comments on commit 3095344

Please sign in to comment.