Skip to content

Commit

Permalink
Addressing PR review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
westonpace committed Jun 27, 2023
1 parent c69181a commit 2d9f78b
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 71 deletions.
41 changes: 41 additions & 0 deletions cpp/src/arrow/filesystem/filesystem_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,34 @@ TEST(PathUtil, SplitAbstractPath) {
AssertPartsEqual(parts, {"abc", "def.ghi"});
}

TEST(PathUtil, SliceAbstractPath) {
std::string path = "abc";
ASSERT_EQ("abc", SliceAbstractPath(path, 0, 1));
ASSERT_EQ("abc", SliceAbstractPath(path, 0, 2));
ASSERT_EQ("", SliceAbstractPath(path, 0, 0));
ASSERT_EQ("", SliceAbstractPath(path, 1, 0));

path = "abc/def\\x/y.ext";
ASSERT_EQ("abc/def\\x/y.ext", SliceAbstractPath(path, 0, 4));
ASSERT_EQ("abc/def\\x/y.ext", SliceAbstractPath(path, 0, 3));
ASSERT_EQ("abc/def\\x", SliceAbstractPath(path, 0, 2));
ASSERT_EQ("abc", SliceAbstractPath(path, 0, 1));
ASSERT_EQ("def\\x/y.ext", SliceAbstractPath(path, 1, 2));
ASSERT_EQ("def\\x/y.ext", SliceAbstractPath(path, 1, 3));
ASSERT_EQ("def\\x", SliceAbstractPath(path, 1, 1));
ASSERT_EQ("y.ext", SliceAbstractPath(path, 2, 1));
ASSERT_EQ("", SliceAbstractPath(path, 3, 1));

path = "x/y\\z";
ASSERT_EQ("x", SliceAbstractPath(path, 0, 1));
ASSERT_EQ("x/y", SliceAbstractPath(path, 0, 1, /*sep=*/'\\'));

// Invalid cases but we shouldn't crash
ASSERT_EQ("", SliceAbstractPath(path, -1, 1));
ASSERT_EQ("", SliceAbstractPath(path, 0, -1));
ASSERT_EQ("", SliceAbstractPath(path, -1, -1));
}

TEST(PathUtil, GetAbstractPathExtension) {
ASSERT_EQ(GetAbstractPathExtension("abc.txt"), "txt");
ASSERT_EQ(GetAbstractPathExtension("dir/abc.txt"), "txt");
Expand All @@ -98,6 +126,19 @@ TEST(PathUtil, GetAbstractPathExtension) {
ASSERT_EQ(GetAbstractPathExtension("/run.d/abc"), "");
}

TEST(PathUtil, GetAbstractPathDepth) {
ASSERT_EQ(0, GetAbstractPathDepth(""));
ASSERT_EQ(0, GetAbstractPathDepth("/"));
ASSERT_EQ(1, GetAbstractPathDepth("foo"));
ASSERT_EQ(1, GetAbstractPathDepth("foo/"));
ASSERT_EQ(1, GetAbstractPathDepth("/foo"));
ASSERT_EQ(1, GetAbstractPathDepth("/foo/"));
ASSERT_EQ(2, GetAbstractPathDepth("/foo/bar"));
ASSERT_EQ(2, GetAbstractPathDepth("/foo/bar/"));
ASSERT_EQ(2, GetAbstractPathDepth("foo/bar"));
ASSERT_EQ(2, GetAbstractPathDepth("foo/bar/"));
}

TEST(PathUtil, GetAbstractPathParent) {
std::pair<std::string, std::string> pair;

Expand Down
16 changes: 15 additions & 1 deletion cpp/src/arrow/filesystem/path_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ std::string SliceAbstractPath(const std::string& s, int offset, int length, char
if (offset >= static_cast<int>(components.size())) {
return "";
}
int end = length;
int end = offset + length;
if (end > static_cast<int>(components.size())) {
end = static_cast<int>(components.size());
}
Expand All @@ -89,6 +89,20 @@ std::string SliceAbstractPath(const std::string& s, int offset, int length, char
return combined.str();
}

int GetAbstractPathDepth(std::string_view path) {
if (path.empty()) {
return 0;
}
int depth = static_cast<int>(std::count(path.begin(), path.end(), kSep)) + 1;
if (path[path.size() - 1] == kSep) {
depth -= 1;
}
if (path.size() > 0 && path[0] == kSep) {
depth -= 1;
}
return depth;
}

std::pair<std::string, std::string> GetAbstractPathParent(const std::string& s) {
// XXX should strip trailing slash?

Expand Down
8 changes: 8 additions & 0 deletions cpp/src/arrow/filesystem/path_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ std::string SliceAbstractPath(const std::string& path, int offset, int length,
// Return the extension of the file
ARROW_EXPORT std::string GetAbstractPathExtension(const std::string& s);

// Return the depth (number of components) of an abstract path
//
// Trailing slashes do not count towards depth
// Leading slashes do not count towards depth
//
// The root path ("/") has depth 0
ARROW_EXPORT int GetAbstractPathDepth(std::string_view path);

// Return the parent directory and basename of an abstract path. Both values may be
// empty.
ARROW_EXPORT
Expand Down
148 changes: 78 additions & 70 deletions cpp/src/arrow/filesystem/s3fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1734,7 +1734,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
static FileInfo MakeDirectoryInfo(std::string dirname) {
FileInfo dir;
dir.set_type(FileType::Directory);
dir.set_path(dirname);
dir.set_path(std::move(dirname));
return dir;
}

Expand All @@ -1750,30 +1750,27 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp

struct FileListerState {
FileInfoSink files_queue;
bool allow_not_found;
int max_recursion;
bool include_virtual;
const bool allow_not_found;
const int max_recursion;

const bool include_virtual;
const io::IOContext io_context;
const std::shared_ptr<Aws::S3::S3Client> client;

S3Model::ListObjectsV2Request req;
io::IOContext io_context;
std::shared_ptr<Aws::S3::S3Client> client;
bool close_sink;
bool no_files_means_not_found;
std::unordered_set<std::string> directories;
bool empty = true;

FileListerState(PushGenerator<std::vector<FileInfo>>::Producer files_queue,
FileSelector select, const std::string& bucket,
const std::string& key, bool include_virtual,
io::IOContext io_context, std::shared_ptr<Aws::S3::S3Client> client,
bool close_sink)
io::IOContext io_context, std::shared_ptr<Aws::S3::S3Client> client)
: files_queue(std::move(files_queue)),
allow_not_found(select.allow_not_found),
max_recursion(select.max_recursion),
include_virtual(include_virtual),
io_context(io_context),
client(std::move(client)),
close_sink(close_sink),
no_files_means_not_found(!select.allow_not_found && !key.empty()) {
client(std::move(client)) {
req.SetBucket(bucket);
req.SetMaxKeys(kListObjectsMaxKeys);
if (!key.empty()) {
Expand All @@ -1784,25 +1781,29 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
}
}

// The FileListerState is kept alive by the various FileListerTasks. Once all the
// tasks are finished this will be destroyed and we can run some cleanup
~FileListerState() {
// * If the bucket doesn't exist we will have already gotten an error from the
// ListObjectsV2 call
// * If the key is empty, and the bucket exists, then there is
// no way we can hit "not found"
// * If they key is not empty, then it's possible
// that the file itself didn't exist and there
// were not files under it. In that case we will hit this if statement and
// should treat this as a "not found" case.
if (empty && no_files_means_not_found) {
void Finish() {
// `empty` means that we didn't get a single file info back from S3. This may be
// a situation that we should consider as PathNotFound.
//
// * If the prefix is empty then we were querying the contents of an entire bucket
// and this is not a PathNotFound case because if the bucket didn't exist then
// we would have received an error and not an empty set of results.
//
// * If the prefix is not empty then we asked for all files under a particular
// directory. S3 will also return the directory itself, if it exists. So if
// we get zero results then we know that there are no files under the directory
// and the directory itself doesn't exist. This should be considered PathNotFound
if (empty && !allow_not_found && !req.GetPrefix().empty()) {
files_queue.Push(PathNotFound(req.GetBucket(), req.GetPrefix()));
}
if (close_sink) {
files_queue.Close();
}
}

// Given a path, iterate through all possible sub-paths and, if we haven't
// seen that sub-path before, return it.
//
// For example, given A/B/C we might return A/B and A if we have not seen
// those paths before. This allows us to consider "virtual directories" which
// don't exist as objects in S3 but can be inferred.
std::vector<std::string> GetNewDirectories(const std::string_view& path) {
std::string current(path);
std::string base = req.GetBucket();
Expand All @@ -1811,17 +1812,16 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
}
std::vector<std::string> new_directories;
while (true) {
auto parent_base = internal::GetAbstractPathParent(current);
if (parent_base.first.empty()) {
const std::string parent_dir = internal::GetAbstractPathParent(current).first;
if (parent_dir.empty()) {
break;
}
const std::string& parent_dir = parent_base.first;
current = parent_dir;
if (current == base) {
break;
}
if (directories.insert(parent_dir).second) {
new_directories.push_back(std::move(parent_base.first));
new_directories.push_back(std::move(parent_dir));
}
}
return new_directories;
Expand Down Expand Up @@ -1858,10 +1858,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
// A file subdirA/subdirB/somefile will have a child depth of 2 and a "depth" of 0.
// A file subdirA/subdirB/subdirC/somefile will have a child depth of 3 and a
// "depth" of 1
int base_depth =
(prefix.empty())
? 0
: static_cast<int>(std::count(prefix.begin(), prefix.end(), kSep));
int base_depth = internal::GetAbstractPathDepth(prefix);
for (const auto& obj : result.GetContents()) {
if (obj.GetKey() == prefix) {
// S3 will return the basedir itself (if it is a file / empty file). We don't
Expand All @@ -1873,16 +1870,16 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
std::string child_key =
std::string(internal::RemoveTrailingSlash(FromAwsString(obj.GetKey())));
bool had_trailing_slash = child_key.size() != obj.GetKey().size();
int child_depth =
static_cast<int>(std::count(child_key.begin(), child_key.end(), kSep));
int depth = child_depth - base_depth;

if (depth > state->max_recursion) {
int child_depth = internal::GetAbstractPathDepth(child_key);
// Recursion depth is 1 smaller because a path with depth 1 (e.g. foo) is
// considered to have a "recursion" of 0
int recursion_depth = child_depth - base_depth - 1;
if (recursion_depth > state->max_recursion) {
// If we have A/B/C/D and max_recursion is 2 then we ignore this (don't add it
// to file_infos) but we still want to potentially add A and A/B as directories.
// So we "pretend" like we have a file A/B/C for the call to GetNewDirectories
// below
int to_trim = depth - state->max_recursion - 1;
int to_trim = recursion_depth - state->max_recursion - 1;
if (to_trim > 0) {
child_key = bucket + kSep +
internal::SliceAbstractPath(child_key, 0, child_depth - to_trim);
Expand Down Expand Up @@ -1959,6 +1956,9 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
DCHECK(!result.GetNextContinuationToken().empty());
state->req.SetContinuationToken(result.GetNextContinuationToken());
scheduler->AddTask(std::make_unique<FileListerTask>(state, scheduler));
} else {
// Otherwise, we have finished listing all the files
state->Finish();
}
}

Expand All @@ -1971,17 +1971,24 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
std::string_view name() const override { return "S3ListFiles"; }
};

// Lists all file, potentially recursively, in a bucket
//
// include_virtual controls whether or not "virtual" files should be included. These
// are files that are not actually file objects but instead are inferred from other
// objects.
//
// For example, if a file exists with path A/B/C then virtual directories A/ and A/B/
// will exist even if there are no file objects with these paths.
void ListAsync(const FileSelector& select, const std::string& bucket,
const std::string& key, bool include_virtual,
util::AsyncTaskScheduler* scheduler, FileInfoSink sink,
bool close_sink) {
util::AsyncTaskScheduler* scheduler, FileInfoSink sink) {
// We can only fetch kListObjectsMaxKeys files at a time and so we create a
// scheduler and schedule a task to grab the first batch. Once that's done we
// schedule a new task for the next batch. All of these tasks share the same
// FileListerState object but none of these tasks run in parallel so there is
// no need to worry about mutexes
auto state = std::make_shared<FileListerState>(
sink, select, bucket, key, include_virtual, io_context_, client_, close_sink);
auto state = std::make_shared<FileListerState>(sink, select, bucket, key,
include_virtual, io_context_, client_);

// Create the first file lister task (it may spawn more)
auto file_lister_task = std::make_unique<FileListerTask>(state, scheduler);
Expand Down Expand Up @@ -2012,15 +2019,14 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
sink.Push(std::move(buckets_as_directories));

if (recursive) {
// Recursively list each bucket (these will run in parallel but out_gen
// should be thread safe)
// Recursively list each bucket (these will run in parallel but sink
// should be thread safe and so this is ok)
for (const auto& bucket : buckets) {
FileSelector select;
select.allow_not_found = true;
select.recursive = true;
select.base_dir = bucket;
self->ListAsync(select, bucket, "", include_virtual, scheduler, sink,
/*close_sink=*/false);
self->ListAsync(select, bucket, "", include_virtual, scheduler, sink);
}
}
});
Expand Down Expand Up @@ -2126,10 +2132,16 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
select.allow_not_found = false;

PushGenerator<std::vector<FileInfo>> file_infos_gen;
// Close the generator when all the listing tasks are done
std::unique_ptr<util::AsyncTaskGroup> listing_scheduler =
util::AsyncTaskGroup::Make(
scheduler, [prod = file_infos_gen.producer()]() mutable {
prod.Close();
return Status::OK();
});

self_ptr->ListAsync(select, bucket, key, /*include_virtual=*/false,
scheduler, file_infos_gen.producer(),
/*close_sink=*/true);
listing_scheduler.get(), file_infos_gen.producer());

auto handle_file_infos = [self_ptr, bucket, scheduler](
const std::vector<FileInfo>& file_infos) {
Expand Down Expand Up @@ -2301,11 +2313,6 @@ Result<FileInfo> S3FileSystem::GetFileInfo(const std::string& s) {

auto outcome = impl_->client_->HeadObject(req);
if (outcome.IsSuccess()) {
bool ends_in_slash = s[s.size() - 1] == '/';
if (outcome.GetResult().GetContentLength() == 0 && ends_in_slash) {
info.set_type(FileType::Directory);
return info;
}
// "File" object found
FileObjectToInfo(outcome.GetResult(), &info);
return info;
Expand Down Expand Up @@ -2353,21 +2360,22 @@ FileInfoGenerator S3FileSystem::GetFileInfoGenerator(const FileSelector& select)
auto base_path = *std::move(maybe_base_path);

PushGenerator<std::vector<FileInfo>> generator;
auto start_listing = [&, sink = generator.producer()](
util::AsyncTaskScheduler* scheduler) {
if (base_path.empty()) {
bool should_recurse = select.recursive && select.max_recursion > 0;
impl_->FullListAsync(/*include_virtual=*/true, scheduler, sink, io_context(),
should_recurse);
} else {
impl_->ListAsync(select, base_path.bucket, base_path.key,
/*include_virtual=*/true, scheduler, sink, /*close_sink=*/false);
}
return Status::OK();
};
auto start_listing =
[&, sink = generator.producer()](util::AsyncTaskScheduler* scheduler) {
if (base_path.empty()) {
bool should_recurse = select.recursive && select.max_recursion > 0;
impl_->FullListAsync(/*include_virtual=*/true, scheduler, sink, io_context(),
should_recurse);
} else {
impl_->ListAsync(select, base_path.bucket, base_path.key,
/*include_virtual=*/true, scheduler, sink);
}
return Status::OK();
};

Future<> all_done_fut = util::AsyncTaskScheduler::Make(
std::move(start_listing), [](const Status&) {}, StopToken::Unstoppable());
std::move(start_listing), /*abort_callback=*/[](const Status&) {},
StopToken::Unstoppable());

// Mark the generator done once all tasks are finished
all_done_fut.AddCallback(
Expand Down

0 comments on commit 2d9f78b

Please sign in to comment.