Skip to content

Commit

Permalink
ARROW-8059: [Python] Make FileSystem objects serializable
Browse files Browse the repository at this point in the history
Also:
- implemented Equals on the filesystem objects
- removed the *Options objects in favor of keyword arguments
- support passing more options as query parameters when instantiating hdfs filesystem from an URI
- exposed more properties of S3FS, SubtreeFS and HDFS

Closes #6644 from kszucs/ARROW-8059

Authored-by: Krisztián Szűcs <szucs.krisztian@gmail.com>
Signed-off-by: Krisztián Szűcs <szucs.krisztian@gmail.com>
  • Loading branch information
kszucs committed Mar 25, 2020
1 parent b07c262 commit 815531c
Show file tree
Hide file tree
Showing 20 changed files with 442 additions and 336 deletions.
14 changes: 14 additions & 0 deletions cpp/src/arrow/filesystem/filesystem.cc
Expand Up @@ -32,6 +32,7 @@
#include "arrow/io/slow.h"
#include "arrow/result.h"
#include "arrow/status.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/logging.h"
#include "arrow/util/macros.h"
#include "arrow/util/uri.h"
Expand Down Expand Up @@ -151,6 +152,17 @@ Result<std::string> SubTreeFileSystem::NormalizeBasePath(
return EnsureTrailingSlash(std::move(base_path));
}

bool SubTreeFileSystem::Equals(const FileSystem& other) const {
if (this == &other) {
return true;
}
if (other.type_name() != type_name()) {
return false;
}
const auto& subfs = ::arrow::internal::checked_cast<const SubTreeFileSystem&>(other);
return base_path_ == subfs.base_path_ && base_fs_->Equals(subfs.base_fs_);
}

std::string SubTreeFileSystem::PrependBase(const std::string& s) const {
if (s.empty()) {
return base_path_;
Expand Down Expand Up @@ -288,6 +300,8 @@ SlowFileSystem::SlowFileSystem(std::shared_ptr<FileSystem> base_fs,
double average_latency, int32_t seed)
: base_fs_(base_fs), latencies_(io::LatencyGenerator::Make(average_latency, seed)) {}

bool SlowFileSystem::Equals(const FileSystem& other) const { return this == &other; }

Result<FileInfo> SlowFileSystem::GetFileInfo(const std::string& path) {
latencies_->Sleep();
return base_fs_->GetFileInfo(path);
Expand Down
11 changes: 11 additions & 0 deletions cpp/src/arrow/filesystem/filesystem.h
Expand Up @@ -168,6 +168,12 @@ class ARROW_EXPORT FileSystem {
/// may allow normalizing irregular path forms (such as Windows local paths).
virtual Result<std::string> NormalizePath(std::string path);

virtual bool Equals(const FileSystem& other) const = 0;

virtual bool Equals(const std::shared_ptr<FileSystem>& other) const {
return Equals(*other);
}

/// Get info for the given target.
///
/// Any symlink is automatically dereferenced, recursively.
Expand Down Expand Up @@ -258,9 +264,13 @@ class ARROW_EXPORT SubTreeFileSystem : public FileSystem {
~SubTreeFileSystem() override;

std::string type_name() const override { return "subtree"; }
std::string base_path() const { return base_path_; }
std::shared_ptr<FileSystem> base_fs() const { return base_fs_; }

Result<std::string> NormalizePath(std::string path) override;

bool Equals(const FileSystem& other) const override;

/// \cond FALSE
using FileSystem::GetFileInfo;
/// \endcond
Expand Down Expand Up @@ -313,6 +323,7 @@ class ARROW_EXPORT SlowFileSystem : public FileSystem {
int32_t seed);

std::string type_name() const override { return "slow"; }
bool Equals(const FileSystem& other) const override;

using FileSystem::GetFileInfo;
Result<FileInfo> GetFileInfo(const std::string& path) override;
Expand Down
65 changes: 60 additions & 5 deletions cpp/src/arrow/filesystem/hdfs.cc
Expand Up @@ -24,6 +24,7 @@
#include "arrow/filesystem/path_util.h"
#include "arrow/io/hdfs.h"
#include "arrow/io/hdfs_internal.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/logging.h"
#include "arrow/util/parsing.h"
#include "arrow/util/windows_fixup.h"
Expand Down Expand Up @@ -63,6 +64,8 @@ class HadoopFileSystem::Impl {
return Status::OK();
}

HdfsOptions options() const { return options_; }

Result<FileInfo> GetFileInfo(const std::string& path) {
FileInfo info;
io::HdfsPathInfo path_info;
Expand Down Expand Up @@ -270,6 +273,16 @@ void HdfsOptions::ConfigureHdfsBlockSize(int64_t default_block_size) {
this->default_block_size = default_block_size;
}

bool HdfsOptions::Equals(const HdfsOptions& other) const {
return (buffer_size == other.buffer_size && replication == other.replication &&
default_block_size == other.default_block_size &&
connection_config.host == other.connection_config.host &&
connection_config.port == other.connection_config.port &&
connection_config.user == other.connection_config.user &&
connection_config.kerb_ticket == other.connection_config.kerb_ticket &&
connection_config.extra_conf == other.connection_config.extra_conf);
}

Result<HdfsOptions> HdfsOptions::FromUri(const Uri& uri) {
HdfsOptions options;

Expand All @@ -282,6 +295,7 @@ Result<HdfsOptions> HdfsOptions::FromUri(const Uri& uri) {
std::string host;
host = uri.scheme() + "://" + uri.host();

// configure endpoint
const auto port = uri.port();
if (port == -1) {
// default port will be determined by hdfs FileSystem impl
Expand All @@ -290,21 +304,49 @@ Result<HdfsOptions> HdfsOptions::FromUri(const Uri& uri) {
options.ConfigureEndPoint(host, port);
}

// configure replication
auto it = options_map.find("replication");
if (it != options_map.end()) {
const auto& v = it->second;
::arrow::internal::StringConverter<Int16Type> converter;
int16_t reps;
if (!converter(v.data(), v.size(), &reps)) {
int16_t replication;
if (!converter(v.data(), v.size(), &replication)) {
return Status::Invalid("Invalid value for option 'replication': '", v, "'");
}
options.ConfigureHdfsReplication(reps);
options.ConfigureHdfsReplication(replication);
}
it = options_map.find("user");

// configure buffer_size
it = options_map.find("buffer_size");
if (it != options_map.end()) {
const auto& v = it->second;
options.ConfigureHdfsUser(v);
::arrow::internal::StringConverter<Int32Type> converter;
int32_t buffer_size;
if (!converter(v.data(), v.size(), &buffer_size)) {
return Status::Invalid("Invalid value for option 'buffer_size': '", v, "'");
}
options.ConfigureHdfsBufferSize(buffer_size);
}

// configure default_block_size
it = options_map.find("default_block_size");
if (it != options_map.end()) {
const auto& v = it->second;
::arrow::internal::StringConverter<Int64Type> converter;
int64_t default_block_size;
if (!converter(v.data(), v.size(), &default_block_size)) {
return Status::Invalid("Invalid value for option 'default_block_size': '", v, "'");
}
options.ConfigureHdfsBlockSize(default_block_size);
}

// configure user
it = options_map.find("user");
if (it != options_map.end()) {
const auto& user = it->second;
options.ConfigureHdfsUser(user);
}

return options;
}

Expand All @@ -330,6 +372,19 @@ Result<FileInfo> HadoopFileSystem::GetFileInfo(const std::string& path) {
return impl_->GetFileInfo(path);
}

HdfsOptions HadoopFileSystem::options() const { return impl_->options(); }

bool HadoopFileSystem::Equals(const FileSystem& other) const {
if (this == &other) {
return true;
}
if (other.type_name() != type_name()) {
return false;
}
const auto& hdfs = ::arrow::internal::checked_cast<const HadoopFileSystem&>(other);
return options().Equals(hdfs.options());
}

Result<std::vector<FileInfo>> HadoopFileSystem::GetFileInfo(const FileSelector& select) {
return impl_->GetFileInfo(select);
}
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/filesystem/hdfs.h
Expand Up @@ -47,6 +47,8 @@ struct ARROW_EXPORT HdfsOptions {
void ConfigureHdfsBufferSize(int32_t buffer_size);
void ConfigureHdfsBlockSize(int64_t default_block_size);

bool Equals(const HdfsOptions& other) const;

static Result<HdfsOptions> FromUri(const ::arrow::internal::Uri& uri);
static Result<HdfsOptions> FromUri(const std::string& uri);
};
Expand All @@ -60,6 +62,8 @@ class ARROW_EXPORT HadoopFileSystem : public FileSystem {
~HadoopFileSystem() override;

std::string type_name() const override { return "hdfs"; }
HdfsOptions options() const;
bool Equals(const FileSystem& other) const override;

/// \cond FALSE
using FileSystem::GetFileInfo;
Expand Down
13 changes: 13 additions & 0 deletions cpp/src/arrow/filesystem/localfs.cc
Expand Up @@ -233,6 +233,10 @@ LocalFileSystemOptions LocalFileSystemOptions::Defaults() {
return LocalFileSystemOptions();
}

bool LocalFileSystemOptions::Equals(const LocalFileSystemOptions& other) const {
return use_mmap == other.use_mmap;
}

LocalFileSystem::LocalFileSystem() : options_(LocalFileSystemOptions::Defaults()) {}

LocalFileSystem::LocalFileSystem(const LocalFileSystemOptions& options)
Expand All @@ -245,6 +249,15 @@ Result<std::string> LocalFileSystem::NormalizePath(std::string path) {
return fn.ToString();
}

bool LocalFileSystem::Equals(const FileSystem& other) const {
if (other.type_name() != type_name()) {
return false;
} else {
const auto& localfs = ::arrow::internal::checked_cast<const LocalFileSystem&>(other);
return options_.Equals(localfs.options());
}
}

Result<FileInfo> LocalFileSystem::GetFileInfo(const std::string& path) {
ARROW_ASSIGN_OR_RAISE(auto fn, PlatformFilename::FromString(path));
return StatFile(fn.ToNative());
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/arrow/filesystem/localfs.h
Expand Up @@ -34,6 +34,8 @@ struct ARROW_EXPORT LocalFileSystemOptions {

/// \brief Initialize with defaults
static LocalFileSystemOptions Defaults();

bool Equals(const LocalFileSystemOptions& other) const;
};

/// \brief A FileSystem implementation accessing files on the local machine.
Expand All @@ -52,6 +54,10 @@ class ARROW_EXPORT LocalFileSystem : public FileSystem {

Result<std::string> NormalizePath(std::string path) override;

bool Equals(const FileSystem& other) const override;

LocalFileSystemOptions options() const { return options_; }

/// \cond FALSE
using FileSystem::GetFileInfo;
/// \endcond
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/filesystem/mockfs.cc
Expand Up @@ -383,6 +383,8 @@ MockFileSystem::MockFileSystem(TimePoint current_time) {
impl_ = std::unique_ptr<Impl>(new Impl(current_time));
}

bool MockFileSystem::Equals(const FileSystem& other) const { return this == &other; }

Status MockFileSystem::CreateDir(const std::string& path, bool recursive) {
auto parts = SplitAbstractPath(path);
RETURN_NOT_OK(ValidateAbstractPathParts(parts));
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/filesystem/mockfs.h
Expand Up @@ -63,6 +63,8 @@ class ARROW_EXPORT MockFileSystem : public FileSystem {

std::string type_name() const override { return "mock"; }

bool Equals(const FileSystem& other) const override;

// XXX It's not very practical to have to explicitly declare inheritance
// of default overrides.
using FileSystem::GetFileInfo;
Expand Down
33 changes: 33 additions & 0 deletions cpp/src/arrow/filesystem/s3fs.cc
Expand Up @@ -69,6 +69,7 @@
#include "arrow/io/util_internal.h"
#include "arrow/result.h"
#include "arrow/status.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/logging.h"
#include "arrow/util/windows_fixup.h"

Expand Down Expand Up @@ -168,6 +169,16 @@ void S3Options::ConfigureAccessKey(const std::string& access_key,
ToAwsString(access_key), ToAwsString(secret_key));
}

std::string S3Options::GetAccessKey() const {
auto credentials = credentials_provider->GetAWSCredentials();
return std::string(FromAwsString(credentials.GetAWSAccessKeyId()));
}

std::string S3Options::GetSecretKey() const {
auto credentials = credentials_provider->GetAWSCredentials();
return std::string(FromAwsString(credentials.GetAWSSecretKey()));
}

S3Options S3Options::Defaults() {
S3Options options;
options.ConfigureDefaultCredentials();
Expand Down Expand Up @@ -240,6 +251,13 @@ Result<S3Options> S3Options::FromUri(const std::string& uri_string,
return FromUri(uri, out_path);
}

bool S3Options::Equals(const S3Options& other) const {
return (region == other.region && endpoint_override == other.endpoint_override &&
scheme == other.scheme && background_writes == other.background_writes &&
GetAccessKey() == other.GetAccessKey() &&
GetSecretKey() == other.GetSecretKey());
}

namespace {

Status CheckS3Initialized() {
Expand Down Expand Up @@ -836,6 +854,8 @@ class S3FileSystem::Impl {
return Status::OK();
}

S3Options options() const { return options_; }

// Create a bucket. Successful if bucket already exists.
Status CreateBucket(const std::string& bucket) {
S3Model::CreateBucketConfiguration config;
Expand Down Expand Up @@ -1190,6 +1210,19 @@ Result<std::shared_ptr<S3FileSystem>> S3FileSystem::Make(const S3Options& option
return ptr;
}

bool S3FileSystem::Equals(const FileSystem& other) const {
if (this == &other) {
return true;
}
if (other.type_name() != type_name()) {
return false;
}
const auto& s3fs = ::arrow::internal::checked_cast<const S3FileSystem&>(other);
return options().Equals(s3fs.options());
}

S3Options S3FileSystem::options() const { return impl_->options(); }

Result<FileInfo> S3FileSystem::GetFileInfo(const std::string& s) {
S3Path path;
RETURN_NOT_OK(S3Path::FromString(s, &path));
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/arrow/filesystem/s3fs.h
Expand Up @@ -61,6 +61,11 @@ struct ARROW_EXPORT S3Options {
/// Configure with explicit access and secret key.
void ConfigureAccessKey(const std::string& access_key, const std::string& secret_key);

std::string GetAccessKey() const;
std::string GetSecretKey() const;

bool Equals(const S3Options& other) const;

/// \brief Initialize with default credentials provider chain
///
/// This is recommended if you use the standard AWS environment variables
Expand All @@ -86,6 +91,9 @@ class ARROW_EXPORT S3FileSystem : public FileSystem {
~S3FileSystem() override;

std::string type_name() const override { return "s3"; }
S3Options options() const;

bool Equals(const FileSystem& other) const override;

/// \cond FALSE
using FileSystem::GetFileInfo;
Expand Down
2 changes: 1 addition & 1 deletion python/pyarrow/_fs.pxd
Expand Up @@ -57,7 +57,7 @@ cdef class FileSystem:
cdef init(self, const shared_ptr[CFileSystem]& wrapped)

@staticmethod
cdef wrap(shared_ptr[CFileSystem]& sp)
cdef wrap(const shared_ptr[CFileSystem]& sp)

cdef inline shared_ptr[CFileSystem] unwrap(self) nogil

Expand Down

0 comments on commit 815531c

Please sign in to comment.