New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-34213: [C++] Use recursive calls without a delimiter if the user is doing a recursive GetFileInfo #35440
GH-34213: [C++] Use recursive calls without a delimiter if the user is doing a recursive GetFileInfo #35440
Conversation
|
I'm leaving this in draft while I do more profiling. I have already tested the worst case scenario (10k files spread across 10k directories) and it improves performance by 10-15x when testing from my desktop to S3. I've also tested the flat scenario (10k files in the bucket with no directories) and there is no regression. I also want to test running from within EC2. I expect the performance gains to be smaller since the request latency is smaller but there should still be some gain. Finally, I want to run some local perf tests with minio. |
Thanks for the report @westonpace 👍 |
2a12837
to
5c88217
Compare
I've gone through and done some more thorough testing now. I was surprised to see we even have a big improvement when running in EC2, even if we're in the same datacenter (where the latency should be very low). I tested with two test datasets. The first test dataset (s3://ursa-qa/wide-partition) was 10,000 files split into 10,000 folders nested two deep:
The second dataset (s3://ursa-qa/flat-partition) was 10,000 files in the root folder:
For all of my tests I timed how long it took to recursively listed all files in the dataset. I ran the tests on my local desktop (outside of EC2, on an EC2 server that was in a different region (us-west-2) and on an EC2 server that was in the same region (us-east-2). All times are in seconds. There were (as hoped) significant performance improvements in the wide-partition dataset: Regrettably, there might be a slight regression in the flat-partition dataset, although it is largely within the noise. I have run the tests frequently enough that I feel it is stable: I've verified that, in both the old and new approach, we are sending the same number of HTTP messages to S3 and the content is very close (less than 300 bytes difference). I don't think it is additional compute time (or else I'd expect to see a worse regression on the AWS servers). We could keep both styles (tree walking and recursive listing) but I don't think this regression is significant enough to justify the complexity. There is one other case that would likely regress. That is the case where the data is deeply partitioned (e.g. each file is 4 or 5 folders deep) and the user specifies a low max recursion. For example...
I would expect no regression if I fully listed the above dataset. However, if I listed the above dataset with a max_recursion of 2 then the old approach would likely be much faster since it only needs to return 1 file info (the new approach would return all 10k file infos and then we would pare them down in memory). I'm not aware of anyone using this use case (pyarrow doesn't even expose max_recursion) and so I'm not sure if it is worth the complexity of keeping both approaches. Even in this case I suspect we would be looking at a difference of 0.3 vs 3 seconds which is better than our current worst case (~100 seconds). |
Thanks @westonpace , this matches what we see as well. We're really looking forward to seeing this merged, will be hugely beneficial on our current workflows. |
@westonpace Thanks for the analysis. I agree this looks very beneficial. |
if (offset >= static_cast<int>(components.size())) { | ||
return ""; | ||
} | ||
int end = length; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use length
directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, this should be length + offset
. I've fixed it.
ARROW_EXPORT | ||
std::string GetAbstractPathExtension(const std::string& s); | ||
std::string SliceAbstractPath(const std::string& path, int offset, int length, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add basic tests for this in filesystem_test.cc
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -204,6 +204,29 @@ TEST(AsyncTaskScheduler, InitialTaskFails) { | |||
ASSERT_FINISHES_AND_RAISES(Invalid, finished); | |||
} | |||
|
|||
TEST(AsyncTaskScheduler, TaskDestroyedBeforeSchedulerEnds) { | |||
bool my_task_destroyed = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it be an atomic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. The AsyncTaskScheduler itself does not actually spawn thread tasks. It relies on the tasks themselves to do this (in fact, it doesn't include executor.h or have any knowledge of thread pools). So this test does not involve threads at all and is entirely synchronous.
cpp/src/arrow/filesystem/s3fs.cc
Outdated
int base_depth = | ||
(prefix.empty()) | ||
? 0 | ||
: static_cast<int>(std::count(prefix.begin(), prefix.end(), kSep)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Write a small helper for this?
int GetAbstractPathDepth(util::string_view path) {
if (path.empty()) { return 0; };
return static_cast<int>(std::count(path.begin(), path.end(), kSep));
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did and I put it in path_util.h
cpp/src/arrow/filesystem/s3fs.cc
Outdated
// no way we can hit "not found" | ||
// * If they key is not empty, then it's possible | ||
// that the file itself didn't exist and there | ||
// were not files under it. In that case we will hit this if statement and |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does "the file itself didn't exist and there were not files under it" mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've reworded the comment. Please take a look.
cpp/src/arrow/filesystem/s3fs.cc
Outdated
files_queue.Push(PathNotFound(req.GetBucket(), req.GetPrefix())); | ||
} | ||
if (close_sink) { | ||
files_queue.Close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean we're in the top-level task? Why not do it from the task continuation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not necessarily the top-level task but it means that we've finished all tasks. This happens as the final task is finishing. However, I agree this makes more sense in the task continuation and I've moved it there (this code is no longer a destructor which was kind of confusing).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, in this process I ended up getting rid of close_sink
and moved the logic higher up. I think it has better symmetry as the close of the sink is at the same level as creating the sink.
cpp/src/arrow/filesystem/s3fs.cc
Outdated
if (parent_base.first.empty()) { | ||
break; | ||
} | ||
const std::string& parent_dir = parent_base.first; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not do this above?
const auto parent_dir = internal::GetAbstractPathParent(current).first;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've switched to this.
cpp/src/arrow/filesystem/s3fs.cc
Outdated
sink.Push(std::move(buckets_as_directories)); | ||
|
||
if (recursive) { | ||
// Recursively list each bucket (these will run in parallel but out_gen |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Recursively list each bucket (these will run in parallel but out_gen | |
// Recursively list each bucket (these will run in parallel but sink |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
@pitrou thanks for the review! I believe I've addressed your feedback. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few more comments. Thanks a lot for the update.
cpp/src/arrow/filesystem/s3fs.cc
Outdated
const bool allow_not_found; | ||
const int max_recursion; | ||
|
||
const bool include_virtual; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps name this include_implicit_directories
? It seems more descriptive than include_virtual
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed to include_implicit_dirs
and updated the wording in comments from "virtual" to "implicit".
cpp/src/arrow/filesystem/s3fs.cc
Outdated
RETURN_NOT_OK(TreeWalker::Walk(client_, io_context_, bucket, key, kListObjectsMaxKeys, | ||
handle_results, handle_error, handle_recursion)); | ||
void ListAsync(const FileSelector& select, const std::string& bucket, | ||
const std::string& key, bool include_virtual, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation. Perhaps make the name more descriptive as already suggested above?
cpp/src/arrow/filesystem/s3fs.cc
Outdated
} | ||
producer.Close(); | ||
return DeferNotOk(SubmitIO(ctx, [self]() { return self->client_->ListBuckets(); })) | ||
// TODO(ARROW-12655) Change to Then(Impl::ProcessListBuckets) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// TODO(ARROW-12655) Change to Then(Impl::ProcessListBuckets) | |
// TODO(GH-18652) Change to Then(Impl::ProcessListBuckets) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
2d9f78b
to
f0e2afd
Compare
229d3ff
to
2fa4013
Compare
I added a stress test for GetFileInfoGenerator and DeleteDirContents. It was very useful (detected two bugs, one in this PR and one that existed before) however it is pretty slow (doubles the runtime of s3fs_test on my system). The main slow part is I need to create more than 1000 files so that the @pitrou , I'd be curious to know if you think this test is worth leaving in or if I should remove it back out. |
Have you tried creating those files in parallel using a ThreadPool? |
Yes, it should have the latest client_lock fixes (using |
5219550
to
c9ec4d3
Compare
cpp/src/arrow/filesystem/s3fs.cc
Outdated
if (result.GetContentLength() > 0 || key[key.size() - 1] != '/') { | ||
return Status::IOError("Cannot delete directory contents at ", bucket, kSep, | ||
key, " because it is a file"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems a bit weird to have this code in a helper function named EnsureFileAsync
.
Perhaps make this a private helper inside DoDeleteDirContentsAsync
, or have this return a Result<FileType>
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the method to EnsureIsDirAsync
and changed it so that it returns Result<bool>
and moved the error message into DeleteDirContentsAsync
.
cpp/src/arrow/filesystem/s3fs.cc
Outdated
[](const Status& st) { | ||
// No need for special abort logic. | ||
}, | ||
StopToken::Unstoppable()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pass io_context().stop_token()
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. I also added a test for cancellation to make sure it works correctly. There was one other spot I had to change to get it to pass correctly.
…ries we now rely on S3's ability to do a recursive find. This significantly reduces the number of requests made to S3. Fix an invalid lifecycle issue. Minor tweak to the async task scheduler. Tasks should all be destroyed before the scheduler ends Add missing headers Need to use auto to avoid API inconsistency in S3 Addressing PR review comments Add a stress test for GetFileInfoGenerator. Fix an old bug in DeleteDirContents. Fix a bug in GetFileInfoGenerator. Addressing review comments Applying suggestion from pitrou Remove yields added for debugging Remove accidentally added include
…ectly hooked up. Refactored EnsureNotFileAsync into EnsureIsDirAsync and changed it to return a bool
c9ec4d3
to
56bed6a
Compare
@westonpace Can you reintegrate the changes from c9ec4d3 ? They were clobbered when you force-pushed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just some minor questions. Thanks for the update.
cpp/src/arrow/filesystem/s3fs.cc
Outdated
void ListAsync(const FileSelector& select, const std::string& bucket, | ||
const std::string& key, bool include_implicit_dirs, | ||
util::AsyncTaskScheduler* scheduler, FileInfoSink sink, | ||
S3FileSystem::Impl* self) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we pass a S3FileSystem::Impl*
explicitly here? It is just this
, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Good catch. I'm not sure what I was thinking. I've removed these arguments.
cpp/src/arrow/filesystem/s3fs.cc
Outdated
// Fully list all files from all buckets | ||
void FullListAsync(bool include_implicit_dirs, util::AsyncTaskScheduler* scheduler, | ||
FileInfoSink sink, io::IOContext io_context, bool recursive, | ||
S3FileSystem::Impl* self) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question. Also, the IOContext
needn't be passed explicitly either (it's just this->io_context_
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've removed this argument
Sorry, I missed that when I rebased. I've restored the commit. |
I want to echo @cboettig about our excitement for this upgrade to arrow and hope to see it in the next release. It seems that it will greatly accelerate our data pipelines! Thanks so much for your work developing and testing! |
@github-actions crossbow submit -g cpp |
Revision: 7edea03 Submitted crossbow builds: ursacomputing/crossbow @ actions-21347af314 |
…user is doing a recursive GetFileInfo (apache#35440) ### Rationale for this change The old model of "walk"ing the directory could lead to a large number of calls. If someone is fully listing a bucket they will need to make one S3 API call for every single directory in the bucket. With this approach there is only 1 call made for every 1000 files, regardless of how they are spread across directories. The only potential regression would be if max_recursion was set to something > 1. For example, if a user had: ``` bucket/foo/bar/<10000 files here> ``` Then if they make a request for `bucket` with `max_recursion=2` the new approach will list all 10,000 files and then eliminate the files that don't match. However, I believe these cases (using max_recursion) to be rarer and less common than the typical case of listing all files (which dataset discovery does). ### What changes are included in this PR? The algorithm behind GetFileInfo and DeleteDirContents in S3FileSystem has changed. ### Are these changes tested? Yes, there should be no behavior change. All of the existing filesystem tests will test this change. ### Are there any user-facing changes? No, other than (hopefully) better performance. * Closes: apache#34213 Lead-authored-by: Weston Pace <weston.pace@gmail.com> Co-authored-by: Antoine Pitrou <antoine@python.org> Signed-off-by: Antoine Pitrou <antoine@python.org>
…user is doing a recursive GetFileInfo (apache#35440) ### Rationale for this change The old model of "walk"ing the directory could lead to a large number of calls. If someone is fully listing a bucket they will need to make one S3 API call for every single directory in the bucket. With this approach there is only 1 call made for every 1000 files, regardless of how they are spread across directories. The only potential regression would be if max_recursion was set to something > 1. For example, if a user had: ``` bucket/foo/bar/<10000 files here> ``` Then if they make a request for `bucket` with `max_recursion=2` the new approach will list all 10,000 files and then eliminate the files that don't match. However, I believe these cases (using max_recursion) to be rarer and less common than the typical case of listing all files (which dataset discovery does). ### What changes are included in this PR? The algorithm behind GetFileInfo and DeleteDirContents in S3FileSystem has changed. ### Are these changes tested? Yes, there should be no behavior change. All of the existing filesystem tests will test this change. ### Are there any user-facing changes? No, other than (hopefully) better performance. * Closes: apache#34213 Lead-authored-by: Weston Pace <weston.pace@gmail.com> Co-authored-by: Antoine Pitrou <antoine@python.org> Signed-off-by: Antoine Pitrou <antoine@python.org>
Rationale for this change
The old model of "walk"ing the directory could lead to a large number of calls. If someone is fully listing a bucket they will need to make one S3 API call for every single directory in the bucket. With this approach there is only 1 call made for every 1000 files, regardless of how they are spread across directories.
The only potential regression would be if max_recursion was set to something > 1. For example, if a user had:
Then if they make a request for
bucket
withmax_recursion=2
the new approach will list all 10,000 files and then eliminate the files that don't match.However, I believe these cases (using max_recursion) to be rarer and less common than the typical case of listing all files (which dataset discovery does).
What changes are included in this PR?
The algorithm behind GetFileInfo and DeleteDirContents in S3FileSystem has changed.
Are these changes tested?
Yes, there should be no behavior change. All of the existing filesystem tests will test this change.
Are there any user-facing changes?
No, other than (hopefully) better performance.