Skip to content

Commit

Permalink
Addressing review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
westonpace committed Jul 20, 2023
1 parent 6e75a6e commit 7edea03
Showing 1 changed file with 20 additions and 23 deletions.
43 changes: 20 additions & 23 deletions cpp/src/arrow/filesystem/s3fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1953,10 +1953,9 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
FileInfoSink files_queue;
const bool allow_not_found;
const int max_recursion;

const bool include_implicit_dirs;
const io::IOContext io_context;
S3FileSystem::Impl* self;
S3ClientHolder* const holder;

S3Model::ListObjectsV2Request req;
std::unordered_set<std::string> directories;
Expand All @@ -1965,13 +1964,13 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
FileListerState(PushGenerator<std::vector<FileInfo>>::Producer files_queue,
FileSelector select, const std::string& bucket,
const std::string& key, bool include_implicit_dirs,
io::IOContext io_context, S3FileSystem::Impl* self)
io::IOContext io_context, S3ClientHolder* holder)
: files_queue(std::move(files_queue)),
allow_not_found(select.allow_not_found),
max_recursion(select.max_recursion),
include_implicit_dirs(include_implicit_dirs),
io_context(io_context),
self(self) {
io_context(std::move(io_context)),
holder(holder) {
req.SetBucket(bucket);
req.SetMaxKeys(kListObjectsMaxKeys);
if (!key.empty()) {
Expand Down Expand Up @@ -2035,7 +2034,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp

FileListerTask(std::shared_ptr<FileListerState> state,
util::AsyncTaskScheduler* scheduler)
: state(state), scheduler(scheduler) {}
: state(std::move(state)), scheduler(scheduler) {}

std::vector<FileInfo> ToFileInfos(const std::string& bucket,
const std::string& prefix,
Expand Down Expand Up @@ -2128,7 +2127,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
void Run() {
// We are on an I/O thread now so just synchronously make the call and interpret the
// results.
Result<S3ClientLock> client_lock = state->self->holder_->Lock();
Result<S3ClientLock> client_lock = state->holder->Lock();
if (!client_lock.ok()) {
state->files_queue.Push(client_lock.status());
return;
Expand Down Expand Up @@ -2188,15 +2187,15 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
// will exist even if there are no file objects with these paths.
void ListAsync(const FileSelector& select, const std::string& bucket,
const std::string& key, bool include_implicit_dirs,
util::AsyncTaskScheduler* scheduler, FileInfoSink sink,
S3FileSystem::Impl* self) {
util::AsyncTaskScheduler* scheduler, FileInfoSink sink) {
// We can only fetch kListObjectsMaxKeys files at a time and so we create a
// scheduler and schedule a task to grab the first batch. Once that's done we
// schedule a new task for the next batch. All of these tasks share the same
// FileListerState object but none of these tasks run in parallel so there is
// no need to worry about mutexes
auto state = std::make_shared<FileListerState>(
sink, select, bucket, key, include_implicit_dirs, io_context_, self);
auto state = std::make_shared<FileListerState>(sink, select, bucket, key,
include_implicit_dirs, io_context_,
this->holder_.get());

// Create the first file lister task (it may spawn more)
auto file_lister_task = std::make_unique<FileListerTask>(state, scheduler);
Expand All @@ -2205,13 +2204,12 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp

// Fully list all files from all buckets
void FullListAsync(bool include_implicit_dirs, util::AsyncTaskScheduler* scheduler,
FileInfoSink sink, io::IOContext io_context, bool recursive,
S3FileSystem::Impl* self) {
FileInfoSink sink, bool recursive) {
scheduler->AddSimpleTask(
[scheduler, sink, io_context, include_implicit_dirs, recursive, self]() mutable {
return self->ListBucketsAsync(io_context)
.Then([self, scheduler, sink, include_implicit_dirs,
recursive](const std::vector<std::string>& buckets) mutable {
[this, scheduler, sink, include_implicit_dirs, recursive]() mutable {
return ListBucketsAsync().Then(
[this, scheduler, sink, include_implicit_dirs,
recursive](const std::vector<std::string>& buckets) mutable {
// Return the buckets themselves as directories
std::vector<FileInfo> buckets_as_directories =
MakeDirectoryInfos(buckets);
Expand All @@ -2225,8 +2223,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
select.allow_not_found = true;
select.recursive = true;
select.base_dir = bucket;
self->ListAsync(select, bucket, "", include_implicit_dirs, scheduler,
sink, self);
ListAsync(select, bucket, "", include_implicit_dirs, scheduler, sink);
}
}
});
Expand Down Expand Up @@ -2413,10 +2410,10 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
if (base_path.empty()) {
bool should_recurse = select.recursive && select.max_recursion > 0;
self->FullListAsync(/*include_implicit_dirs=*/true, scheduler, sink,
self->io_context_, should_recurse, self);
should_recurse);
} else {
self->ListAsync(select, base_path.bucket, base_path.key,
/*include_implicit_dirs=*/true, scheduler, sink, self);
/*include_implicit_dirs=*/true, scheduler, sink);
}
return Status::OK();
});
Expand Down Expand Up @@ -2465,13 +2462,13 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
return ProcessListBuckets(client_lock.Move()->ListBuckets());
}

Future<std::vector<std::string>> ListBucketsAsync(io::IOContext ctx) {
Future<std::vector<std::string>> ListBucketsAsync() {
auto deferred =
[self = shared_from_this()]() mutable -> Result<std::vector<std::string>> {
ARROW_ASSIGN_OR_RAISE(auto client_lock, self->holder_->Lock());
return self->ProcessListBuckets(client_lock.Move()->ListBuckets());
};
return DeferNotOk(SubmitIO(ctx, std::move(deferred)));
return DeferNotOk(SubmitIO(io_context_, std::move(deferred)));
}

Result<std::shared_ptr<ObjectInputFile>> OpenInputFile(const std::string& s,
Expand Down

0 comments on commit 7edea03

Please sign in to comment.