diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index 2ba64ee22f54f..640888e1c4fa5 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -99,12 +99,21 @@ #define ARROW_S3_HAS_CRT #endif +#if ARROW_AWS_SDK_VERSION_CHECK(1, 10, 0) +#define ARROW_S3_HAS_S3CLIENT_CONFIGURATION +#endif + #ifdef ARROW_S3_HAS_CRT #include #include #include #endif +#ifdef ARROW_S3_HAS_S3CLIENT_CONFIGURATION +#include +#include +#endif + #include "arrow/util/windows_fixup.h" #include "arrow/buffer.h" @@ -128,19 +137,17 @@ #include "arrow/util/task_group.h" #include "arrow/util/thread_pool.h" -namespace arrow { - -using internal::TaskGroup; -using internal::ToChars; -using io::internal::SubmitIO; -using util::Uri; - -namespace fs { +namespace arrow::fs { using ::Aws::Client::AWSError; using ::Aws::S3::S3Errors; namespace S3Model = Aws::S3::Model; +using ::arrow::internal::TaskGroup; +using ::arrow::internal::ToChars; +using ::arrow::io::internal::SubmitIO; +using ::arrow::util::Uri; + using internal::ConnectRetryStrategy; using internal::DetectS3Backend; using internal::ErrorToStatus; @@ -913,6 +920,134 @@ Result> GetClientHolder( // ----------------------------------------------------------------------- // S3 client factory: build S3Client from S3Options +#ifdef ARROW_S3_HAS_S3CLIENT_CONFIGURATION + +// GH-40279: standard initialization of S3Client creates a new `S3EndpointProvider` +// every time. Its construction takes 1ms, which makes instantiating every S3Client +// very costly (see upstream bug report +// at https://github.com/aws/aws-sdk-cpp/issues/2880). +// To work around this, we build and cache `S3EndpointProvider` instances +// for each distinct endpoint configuration, and reuse them whenever possible. +// Since most applications tend to use a single endpoint configuration, this +// makes the 1ms setup cost a once-per-process overhead, making it much more +// bearable - if not ideal. + +struct EndpointConfigKey { + explicit EndpointConfigKey(const Aws::S3::S3ClientConfiguration& config) + : region(config.region), + scheme(config.scheme), + endpoint_override(config.endpointOverride), + use_virtual_addressing(config.useVirtualAddressing) {} + + Aws::String region; + Aws::Http::Scheme scheme; + Aws::String endpoint_override; + bool use_virtual_addressing; + + bool operator==(const EndpointConfigKey& other) const noexcept { + return region == other.region && scheme == other.scheme && + endpoint_override == other.endpoint_override && + use_virtual_addressing == other.use_virtual_addressing; + } +}; + +} // namespace +} // namespace arrow::fs + +template <> +struct std::hash { + std::size_t operator()(const arrow::fs::EndpointConfigKey& key) const noexcept { + // A crude hash is sufficient since we expect the cache to remain very small. + auto h = std::hash{}; + return h(key.region) ^ h(key.endpoint_override); + } +}; + +namespace arrow::fs { +namespace { + +// EndpointProvider configuration happens in a non-thread-safe way, even +// when the updates are idempotent. This is a problem when trying to reuse +// a single EndpointProvider from several clients. +// To work around this, this class ensures reconfiguration of an existing +// EndpointProvider is a no-op. +class InitOnceEndpointProvider : public Aws::S3::S3EndpointProviderBase { + public: + explicit InitOnceEndpointProvider( + std::shared_ptr wrapped) + : wrapped_(std::move(wrapped)) {} + + void InitBuiltInParameters(const Aws::S3::S3ClientConfiguration& config) override {} + + void OverrideEndpoint(const Aws::String& endpoint) override { + ARROW_LOG(ERROR) << "unexpected call to InitOnceEndpointProvider::OverrideEndpoint"; + } + Aws::S3::Endpoint::S3ClientContextParameters& AccessClientContextParameters() override { + ARROW_LOG(ERROR) + << "unexpected call to InitOnceEndpointProvider::AccessClientContextParameters"; + // Need to return a reference to something... + return wrapped_->AccessClientContextParameters(); + } + + const Aws::S3::Endpoint::S3ClientContextParameters& GetClientContextParameters() + const override { + return wrapped_->GetClientContextParameters(); + } + Aws::Endpoint::ResolveEndpointOutcome ResolveEndpoint( + const Aws::Endpoint::EndpointParameters& params) const override { + return wrapped_->ResolveEndpoint(params); + } + + protected: + std::shared_ptr wrapped_; +}; + +// A class that instantiates a single EndpointProvider per distinct endpoint +// configuration and initializes it in a thread-safe way. See earlier comments +// for rationale. +class EndpointProviderCache { + public: + std::shared_ptr Lookup( + const Aws::S3::S3ClientConfiguration& config) { + auto key = EndpointConfigKey(config); + CacheValue* value; + { + std::unique_lock lock(mutex_); + value = &cache_[std::move(key)]; + } + std::call_once(value->once, [&]() { + auto endpoint_provider = std::make_shared(); + endpoint_provider->InitBuiltInParameters(config); + value->endpoint_provider = + std::make_shared(std::move(endpoint_provider)); + }); + return value->endpoint_provider; + } + + void Reset() { + std::unique_lock lock(mutex_); + cache_.clear(); + } + + static EndpointProviderCache* Instance() { + static EndpointProviderCache instance; + return &instance; + } + + private: + EndpointProviderCache() = default; + + struct CacheValue { + std::once_flag once; + std::shared_ptr endpoint_provider; + }; + + std::mutex mutex_; + std::unordered_map cache_; +}; + +#endif // ARROW_S3_HAS_S3CLIENT_CONFIGURATION + class ClientBuilder { public: explicit ClientBuilder(S3Options options) : options_(std::move(options)) {} @@ -958,9 +1093,6 @@ class ClientBuilder { client_config_.caPath = ToAwsString(internal::global_options.tls_ca_dir_path); } - const bool use_virtual_addressing = - options_.endpoint_override.empty() || options_.force_virtual_addressing; - // Set proxy options if provided if (!options_.proxy_options.scheme.empty()) { if (options_.proxy_options.scheme == "http") { @@ -990,10 +1122,20 @@ class ClientBuilder { client_config_.maxConnections = std::max(io_context->executor()->GetCapacity(), 25); } + const bool use_virtual_addressing = + options_.endpoint_override.empty() || options_.force_virtual_addressing; + +#ifdef ARROW_S3_HAS_S3CLIENT_CONFIGURATION + client_config_.useVirtualAddressing = use_virtual_addressing; + auto endpoint_provider = EndpointProviderCache::Instance()->Lookup(client_config_); + auto client = std::make_shared(credentials_provider_, endpoint_provider, + client_config_); +#else auto client = std::make_shared( credentials_provider_, client_config_, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, use_virtual_addressing); +#endif client->s3_retry_strategy_ = options_.retry_strategy; return GetClientHolder(std::move(client)); } @@ -1002,7 +1144,11 @@ class ClientBuilder { protected: S3Options options_; +#ifdef ARROW_S3_HAS_S3CLIENT_CONFIGURATION + Aws::S3::S3ClientConfiguration client_config_; +#else Aws::Client::ClientConfiguration client_config_; +#endif std::shared_ptr credentials_provider_; }; @@ -2949,6 +3095,9 @@ struct AwsInstance { "This could lead to a segmentation fault at exit"; } GetClientFinalizer()->Finalize(); +#ifdef ARROW_S3_HAS_S3CLIENT_CONFIGURATION + EndpointProviderCache::Instance()->Reset(); +#endif Aws::ShutdownAPI(aws_options_); } } @@ -3090,5 +3239,4 @@ Result ResolveS3BucketRegion(const std::string& bucket) { return resolver->ResolveRegion(bucket); } -} // namespace fs -} // namespace arrow +} // namespace arrow::fs