Skip to content

Commit

Permalink
ARROW-7761: [C++][Python] Support S3 URIs
Browse files Browse the repository at this point in the history
Allow instantiating an S3 filesystem instance from a `s3://user:pass@bucket/path?params...` URI.

Closes #6403 from pitrou/ARROW-7761-s3-uris and squashes the following commits:

1c27624 <François Saint-Jacques> Add multiple fs support to dataset example
3c73b76 <Antoine Pitrou> Add implicit S3 initialization
0fd17ce <Antoine Pitrou> ARROW-7761:  Support S3 URIs

Lead-authored-by: Antoine Pitrou <antoine@python.org>
Co-authored-by: François Saint-Jacques <fsaintjacques@gmail.com>
Signed-off-by: François Saint-Jacques <fsaintjacques@gmail.com>
  • Loading branch information
pitrou and fsaintjacques committed Feb 17, 2020
1 parent 8bc7fe9 commit d6889f7
Show file tree
Hide file tree
Showing 9 changed files with 295 additions and 9 deletions.
12 changes: 9 additions & 3 deletions cpp/examples/arrow/dataset-parquet-scan-example.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
#include <arrow/dataset/file_parquet.h>
#include <arrow/dataset/filter.h>
#include <arrow/dataset/scanner.h>
#include <arrow/filesystem/localfs.h>
#include <arrow/filesystem/filesystem.h>

using arrow::field;
using arrow::int16;
Expand Down Expand Up @@ -65,6 +65,10 @@ struct Configuration {
std::shared_ptr<ds::Expression> filter = ("total_amount"_ > 1000.0f).Copy();
} conf;

std::shared_ptr<fs::FileSystem> GetFileSystemFromUri(const std::string& uri, std::string* path) {
return fs::FileSystemFromUri(uri, path).ValueOrDie();
}

std::shared_ptr<ds::Dataset> GetDatasetFromPath(std::shared_ptr<fs::FileSystem> fs,
std::shared_ptr<ds::FileFormat> format,
std::string path) {
Expand Down Expand Up @@ -113,15 +117,17 @@ std::shared_ptr<Table> GetTableFromScanner(std::shared_ptr<ds::Scanner> scanner)
}

int main(int argc, char** argv) {
auto fs = std::make_shared<fs::LocalFileSystem>();
auto format = std::make_shared<ds::ParquetFileFormat>();

if (argc != 2) {
// Fake success for CI purposes.
return EXIT_SUCCESS;
}

auto dataset = GetDatasetFromPath(fs, format, argv[1]);
std::string path;
auto fs = GetFileSystemFromUri(argv[1], &path);

auto dataset = GetDatasetFromPath(fs, format, path);

auto scanner = GetScannerFromDataset(dataset, conf.projected_columns, conf.filter,
conf.use_threads);
Expand Down
15 changes: 14 additions & 1 deletion cpp/src/arrow/filesystem/filesystem.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
#ifdef ARROW_HDFS
#include "arrow/filesystem/hdfs.h"
#endif
#ifdef ARROW_S3
#include "arrow/filesystem/s3fs.h"
#endif
#include "arrow/filesystem/localfs.h"
#include "arrow/filesystem/mockfs.h"
#include "arrow/filesystem/path_util.h"
Expand Down Expand Up @@ -386,7 +389,17 @@ Result<std::shared_ptr<FileSystem>> FileSystemFromUriReal(const FileSystemUri& f
ARROW_ASSIGN_OR_RAISE(auto hdfs, HadoopFileSystem::Make(options));
return hdfs;
#else
return Status::NotImplemented("Arrow compiled without HDFS support");
return Status::NotImplemented("Got HDFS URI but Arrow compiled without HDFS support");
#endif
}
if (fsuri.scheme == "s3") {
#ifdef ARROW_S3
RETURN_NOT_OK(EnsureS3Initialized());
ARROW_ASSIGN_OR_RAISE(auto options, S3Options::FromUri(fsuri.uri, out_path));
ARROW_ASSIGN_OR_RAISE(auto s3fs, S3FileSystem::Make(options));
return s3fs;
#else
return Status::NotImplemented("Got S3 URI but Arrow compiled without S3 support");
#endif
}

Expand Down
90 changes: 85 additions & 5 deletions cpp/src/arrow/filesystem/s3fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <condition_variable>
#include <mutex>
#include <sstream>
#include <unordered_map>
#include <utility>

#ifdef _WIN32
Expand Down Expand Up @@ -71,6 +72,9 @@
#include "arrow/util/logging.h"

namespace arrow {

using internal::Uri;

namespace fs {

using ::Aws::Client::AWSError;
Expand All @@ -91,12 +95,13 @@ const char* kS3DefaultRegion = "us-east-1";

static const char kSep = '/';

static std::mutex aws_init_lock;
static Aws::SDKOptions aws_options;
static std::atomic<bool> aws_initialized(false);
namespace {

std::mutex aws_init_lock;
Aws::SDKOptions aws_options;
std::atomic<bool> aws_initialized(false);

Status InitializeS3(const S3GlobalOptions& options) {
std::lock_guard<std::mutex> lock(aws_init_lock);
Status DoInitializeS3(const S3GlobalOptions& options) {
Aws::Utils::Logging::LogLevel aws_log_level;

#define LOG_LEVEL_CASE(level_name) \
Expand Down Expand Up @@ -128,13 +133,29 @@ Status InitializeS3(const S3GlobalOptions& options) {
return Status::OK();
}

} // namespace

Status InitializeS3(const S3GlobalOptions& options) {
std::lock_guard<std::mutex> lock(aws_init_lock);
return DoInitializeS3(options);
}

Status FinalizeS3() {
std::lock_guard<std::mutex> lock(aws_init_lock);
Aws::ShutdownAPI(aws_options);
aws_initialized.store(false);
return Status::OK();
}

Status EnsureS3Initialized() {
std::lock_guard<std::mutex> lock(aws_init_lock);
if (!aws_initialized.load()) {
S3GlobalOptions options{S3LogLevel::Fatal};
return DoInitializeS3(options);
}
return Status::OK();
}

void S3Options::ConfigureDefaultCredentials() {
credentials_provider =
std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>();
Expand All @@ -159,6 +180,65 @@ S3Options S3Options::FromAccessKey(const std::string& access_key,
return options;
}

Result<S3Options> S3Options::FromUri(const Uri& uri, std::string* out_path) {
S3Options options;

const auto bucket = uri.host();
auto path = uri.path();
if (bucket.empty()) {
if (!path.empty()) {
return Status::Invalid("Missing bucket name in S3 URI");
}
} else {
if (path.empty()) {
path = bucket;
} else {
if (path[0] != '/') {
return Status::Invalid("S3 URI should absolute, not relative");
}
path = bucket + path;
}
}
if (out_path != nullptr) {
*out_path = std::string(internal::RemoveTrailingSlash(path));
}

std::unordered_map<std::string, std::string> options_map;
ARROW_ASSIGN_OR_RAISE(const auto options_items, uri.query_items());
for (const auto& kv : options_items) {
options_map.emplace(kv.first, kv.second);
}

const auto username = uri.username();
if (!username.empty()) {
options.ConfigureAccessKey(username, uri.password());
} else {
options.ConfigureDefaultCredentials();
}

auto it = options_map.find("region");
if (it != options_map.end()) {
options.region = it->second;
}
it = options_map.find("scheme");
if (it != options_map.end()) {
options.scheme = it->second;
}
it = options_map.find("endpoint_override");
if (it != options_map.end()) {
options.endpoint_override = it->second;
}

return options;
}

Result<S3Options> S3Options::FromUri(const std::string& uri_string,
std::string* out_path) {
Uri uri;
RETURN_NOT_OK(uri.Parse(uri_string));
return FromUri(uri, out_path);
}

namespace {

Status CheckS3Initialized() {
Expand Down
11 changes: 11 additions & 0 deletions cpp/src/arrow/filesystem/s3fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include "arrow/filesystem/filesystem.h"
#include "arrow/util/macros.h"
#include "arrow/util/uri.h"

namespace Aws {
namespace Auth {
Expand Down Expand Up @@ -68,6 +69,11 @@ struct ARROW_EXPORT S3Options {
/// \brief Initialize with explicit access and secret key
static S3Options FromAccessKey(const std::string& access_key,
const std::string& secret_key);

static Result<S3Options> FromUri(const ::arrow::internal::Uri& uri,
std::string* out_path = NULLPTR);
static Result<S3Options> FromUri(const std::string& uri,
std::string* out_path = NULLPTR);
};

/// S3-backed FileSystem implementation.
Expand Down Expand Up @@ -145,6 +151,11 @@ struct ARROW_EXPORT S3GlobalOptions {
ARROW_EXPORT
Status InitializeS3(const S3GlobalOptions& options);

/// Ensure the S3 APIs are initialized, but only if not already done.
/// If necessary, this will call InitializeS3() with some default options.
ARROW_EXPORT
Status EnsureS3Initialized();

/// Shutdown the S3 APIs.
ARROW_EXPORT
Status FinalizeS3();
Expand Down
64 changes: 64 additions & 0 deletions cpp/src/arrow/filesystem/s3fs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

#include <aws/core/Aws.h>
#include <aws/core/auth/AWSCredentials.h>
#include <aws/core/auth/AWSCredentialsProvider.h>
#include <aws/core/client/RetryStrategy.h>
#include <aws/core/utils/logging/ConsoleLogSystem.h>
#include <aws/s3/S3Client.h>
Expand All @@ -68,6 +69,7 @@ namespace fs {

using ::arrow::internal::PlatformFilename;
using ::arrow::internal::TemporaryDir;
using ::arrow::internal::UriEscape;

using ::arrow::fs::internal::ConnectRetryStrategy;
using ::arrow::fs::internal::ErrorToStatus;
Expand Down Expand Up @@ -237,6 +239,54 @@ void AssertObjectContents(Aws::S3::S3Client* client, const std::string& bucket,
AssertGetObject(result, expected);
}

////////////////////////////////////////////////////////////////////////////
// S3Options tests

TEST(S3Options, FromUri) {
std::string path;
S3Options options;

ASSERT_OK_AND_ASSIGN(options, S3Options::FromUri("s3://", &path));
ASSERT_EQ(options.region, kS3DefaultRegion);
ASSERT_EQ(options.scheme, "https");
ASSERT_EQ(options.endpoint_override, "");
ASSERT_EQ(path, "");

ASSERT_OK_AND_ASSIGN(options, S3Options::FromUri("s3:", &path));
ASSERT_EQ(path, "");

ASSERT_OK_AND_ASSIGN(options, S3Options::FromUri("s3://access:secret@mybucket", &path));
ASSERT_EQ(path, "mybucket");
const auto creds = options.credentials_provider->GetAWSCredentials();
ASSERT_EQ(creds.GetAWSAccessKeyId(), "access");
ASSERT_EQ(creds.GetAWSSecretKey(), "secret");

ASSERT_OK_AND_ASSIGN(options, S3Options::FromUri("s3://mybucket/", &path));
ASSERT_EQ(options.region, kS3DefaultRegion);
ASSERT_EQ(options.scheme, "https");
ASSERT_EQ(options.endpoint_override, "");
ASSERT_EQ(path, "mybucket");

ASSERT_OK_AND_ASSIGN(options, S3Options::FromUri("s3://mybucket/foo/bar/", &path));
ASSERT_EQ(options.region, kS3DefaultRegion);
ASSERT_EQ(options.scheme, "https");
ASSERT_EQ(options.endpoint_override, "");
ASSERT_EQ(path, "mybucket/foo/bar");

ASSERT_OK_AND_ASSIGN(
options,
S3Options::FromUri(
"s3://mybucket/foo/bar/?region=utopia&endpoint_override=localhost&scheme=http",
&path));
ASSERT_EQ(options.region, "utopia");
ASSERT_EQ(options.scheme, "http");
ASSERT_EQ(options.endpoint_override, "localhost");
ASSERT_EQ(path, "mybucket/foo/bar");

// Missing bucket name
ASSERT_RAISES(Invalid, S3Options::FromUri("s3:///foo/bar/", &path));
}

////////////////////////////////////////////////////////////////////////////
// Basic test for the Minio test server.

Expand Down Expand Up @@ -758,6 +808,20 @@ TEST_F(TestS3FS, OpenOutputStreamDestructorSyncWrite) {
TestOpenOutputStreamDestructor();
}

TEST_F(TestS3FS, FileSystemFromUri) {
std::stringstream ss;
ss << "s3://" << minio_.access_key() << ":" << minio_.secret_key()
<< "@bucket/somedir/subdir/subfile"
<< "?scheme=http&endpoint_override=" << UriEscape(minio_.connect_string());

std::string path;
ASSERT_OK_AND_ASSIGN(auto fs, FileSystemFromUri(ss.str(), &path));
ASSERT_EQ(path, "bucket/somedir/subdir/subfile");

// Check the filesystem has the right connection parameters
AssertFileStats(fs.get(), path, FileType::File, 8);
}

////////////////////////////////////////////////////////////////////////////
// Generic S3 tests

Expand Down
30 changes: 30 additions & 0 deletions cpp/src/arrow/util/uri.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ bool IsTextRangeSet(const UriTextRangeStructA& range) { return range.first != nu

} // namespace

std::string UriEscape(const std::string& s) {
std::string escaped;
escaped.resize(3 * s.length());

auto end = uriEscapeExA(s.data(), s.data() + s.length(), &escaped[0],
/*spaceToPlus=*/URI_FALSE, /*normalizeBreaks=*/URI_FALSE);
escaped.resize(end - &escaped[0]);
return escaped;
}

struct Uri::Impl {
Impl() : string_rep_(""), port_(-1) { memset(&uri_, 0, sizeof(uri_)); }

Expand Down Expand Up @@ -95,6 +105,26 @@ std::string Uri::port_text() const { return TextRangeToString(impl_->uri_.portTe

int32_t Uri::port() const { return impl_->port_; }

std::string Uri::username() const {
auto userpass = TextRangeToView(impl_->uri_.userInfo);
auto sep_pos = userpass.find_first_of(':');
if (sep_pos == util::string_view::npos) {
return std::string(userpass);
} else {
return std::string(userpass.substr(0, sep_pos));
}
}

std::string Uri::password() const {
auto userpass = TextRangeToView(impl_->uri_.userInfo);
auto sep_pos = userpass.find_first_of(':');
if (sep_pos == util::string_view::npos) {
return std::string();
} else {
return std::string(userpass.substr(sep_pos + 1));
}
}

std::string Uri::path() const {
// Gather path segments
std::vector<util::string_view> segments;
Expand Down
Loading

0 comments on commit d6889f7

Please sign in to comment.