Skip to content

Commit

Permalink
ARROW-17640: [C++] Add File Handling Test cases for GlobFile handling…
Browse files Browse the repository at this point in the history
… in Substrait Read (#14132)

This PR adds test cases for GlobFile processing in `substrait::ReadRel`.

Authored-by: Vibhatha Abeykoon <vibhatha@gmail.com>
Signed-off-by: Weston Pace <weston.pace@gmail.com>
  • Loading branch information
vibhatha committed Nov 1, 2022
1 parent 474c7a1 commit a1b161e
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 18 deletions.
109 changes: 108 additions & 1 deletion cpp/src/arrow/engine/substrait/serde_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ void CheckRoundTripResult(const std::shared_ptr<Schema> output_schema,
compute::ExecContext& exec_context,
std::shared_ptr<Buffer>& buf,
const std::vector<int>& include_columns = {},
const ConversionOptions& conversion_options = {}) {
const ConversionOptions& conversion_options = {},
const compute::SortOptions* sort_options = NULLPTR) {
std::shared_ptr<ExtensionIdRegistry> sp_ext_id_reg = MakeExtensionIdRegistry();
ExtensionIdRegistry* ext_id_reg = sp_ext_id_reg.get();
ExtensionSet ext_set(ext_id_reg);
Expand All @@ -196,6 +197,16 @@ void CheckRoundTripResult(const std::shared_ptr<Schema> output_schema,
if (!include_columns.empty()) {
ASSERT_OK_AND_ASSIGN(output_table, output_table->SelectColumns(include_columns));
}
if (sort_options) {
ASSERT_OK_AND_ASSIGN(
auto sort_indices,
SortIndices(output_table, std::move(*sort_options), &exec_context));
ASSERT_OK_AND_ASSIGN(
auto maybe_table,
compute::Take(output_table, std::move(sort_indices),
compute::TakeOptions::NoBoundsCheck(), &exec_context));
output_table = maybe_table.table();
}
ASSERT_OK_AND_ASSIGN(output_table, output_table->CombineChunks());
AssertTablesEqual(*expected_table, *output_table);
}
Expand Down Expand Up @@ -3615,5 +3626,101 @@ TEST(Substrait, NestedEmitProjectWithMultiFieldExpressions) {
buf, {}, conversion_options);
}

TEST(Substrait, ReadRelWithGlobFiles) {
#ifdef _WIN32
GTEST_SKIP() << "ARROW-16392: Substrait File URI not supported for Windows";
#endif
compute::ExecContext exec_context;
arrow::dataset::internal::Initialize();

auto dummy_schema =
schema({field("A", int32()), field("B", int32()), field("C", int32())});

// creating a dummy dataset using a dummy table
auto table_1 = TableFromJSON(dummy_schema, {R"([
[1, 1, 10],
[3, 4, 20]
])"});
auto table_2 = TableFromJSON(dummy_schema, {R"([
[11, 11, 110],
[13, 14, 120]
])"});
auto table_3 = TableFromJSON(dummy_schema, {R"([
[21, 21, 210],
[23, 24, 220]
])"});
auto expected_table = TableFromJSON(dummy_schema, {R"([
[1, 1, 10],
[3, 4, 20],
[11, 11, 110],
[13, 14, 120],
[21, 21, 210],
[23, 24, 220]
])"});

std::vector<std::shared_ptr<Table>> input_tables = {table_1, table_2, table_3};
auto format = std::make_shared<arrow::dataset::IpcFileFormat>();
auto filesystem = std::make_shared<fs::LocalFileSystem>();
const std::vector<std::string> file_names = {"serde_test_1.arrow", "serde_test_2.arrow",
"serde_test_3.arrow"};

const std::string path_prefix = "substrait-globfiles-";
int idx = 0;

// creating a vector to avoid out-of-scoping Temporary directory
// if out-of-scoped the written folder get wiped out
std::vector<std::unique_ptr<arrow::internal::TemporaryDir>> tempdirs;
for (size_t i = 0; i < file_names.size(); i++) {
ASSERT_OK_AND_ASSIGN(auto tempdir, arrow::internal::TemporaryDir::Make(path_prefix));
tempdirs.push_back(std::move(tempdir));
}

std::string sample_tempdir_path = tempdirs[0]->path().ToString();
std::string base_tempdir_path =
sample_tempdir_path.substr(0, sample_tempdir_path.find(path_prefix));
std::string glob_like_path =
"file://" + base_tempdir_path + path_prefix + "*/serde_test_*.arrow";

for (const auto& file_name : file_names) {
ASSERT_OK_AND_ASSIGN(auto file_path, tempdirs[idx]->path().Join(file_name));
std::string file_path_str = file_path.ToString();
WriteIpcData(file_path_str, filesystem, input_tables[idx++]);
}

ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan", R"({
"relations": [{
"rel": {
"read": {
"base_schema": {
"names": ["A", "B", "C"],
"struct": {
"types": [{
"i32": {}
}, {
"i32": {}
}, {
"i32": {}
}]
}
},
"local_files": {
"items": [
{
"uri_path_glob": ")" + glob_like_path +
R"(",
"arrow": {}
}
]
}
}
}
}]
})"));

compute::SortOptions options({compute::SortKey("A", compute::SortOrder::Ascending)});
CheckRoundTripResult(std::move(dummy_schema), std::move(expected_table), exec_context,
buf, {}, {}, &options);
}

} // namespace engine
} // namespace arrow
31 changes: 15 additions & 16 deletions cpp/src/arrow/filesystem/filesystem_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ TEST(PathUtil, Globber) {
ASSERT_FALSE(wildcards.Matches("/bucket0/foo/ab/a.parquet"));
}

TEST(InternalUtil, GlobFiles) {
void TestGlobFiles(const std::string& base_dir) {
auto fs = std::make_shared<MockFileSystem>(TimePoint{});

auto check_entries = [](const std::vector<FileInfo>& infos,
Expand All @@ -308,28 +308,27 @@ TEST(InternalUtil, GlobFiles) {
ASSERT_EQ(actual, expected);
};

ASSERT_OK(fs->CreateDir("A/CD"));
ASSERT_OK(fs->CreateDir("AB/CD"));
ASSERT_OK(fs->CreateDir("AB/CD/ab"));
CreateFile(fs.get(), "A/CD/ab.txt", "data");
CreateFile(fs.get(), "AB/CD/a.txt", "data");
CreateFile(fs.get(), "AB/CD/abc.txt", "data");
CreateFile(fs.get(), "AB/CD/ab/c.txt", "data");
ASSERT_OK(fs->CreateDir(base_dir + "A/CD"));
ASSERT_OK(fs->CreateDir(base_dir + "AB/CD"));
ASSERT_OK(fs->CreateDir(base_dir + "AB/CD/ab"));
CreateFile(fs.get(), base_dir + "A/CD/ab.txt", "data");
CreateFile(fs.get(), base_dir + "AB/CD/a.txt", "data");
CreateFile(fs.get(), base_dir + "AB/CD/abc.txt", "data");
CreateFile(fs.get(), base_dir + "AB/CD/ab/c.txt", "data");

FileInfoVector infos;
ASSERT_OK_AND_ASSIGN(infos, GlobFiles(fs, "A*/CD/?b*.txt"));
ASSERT_OK_AND_ASSIGN(infos, GlobFiles(fs, base_dir + "A*/CD/?b*.txt"));
ASSERT_EQ(infos.size(), 2);
check_entries(infos, {"A/CD/ab.txt", "AB/CD/abc.txt"});
check_entries(infos, {base_dir + "A/CD/ab.txt", base_dir + "AB/CD/abc.txt"});

// Leading slash is optional but doesn't change behavior
ASSERT_OK_AND_ASSIGN(infos, GlobFiles(fs, "/A*/CD/?b*.txt"));
ASSERT_EQ(infos.size(), 2);
check_entries(infos, {"A/CD/ab.txt", "AB/CD/abc.txt"});

ASSERT_OK_AND_ASSIGN(infos, GlobFiles(fs, "A*/CD/?/b*.txt"));
ASSERT_OK_AND_ASSIGN(infos, GlobFiles(fs, base_dir + "A*/CD/?/b*.txt"));
ASSERT_EQ(infos.size(), 0);
}

TEST(InternalUtil, GlobFilesWithoutLeadingSlash) { TestGlobFiles(""); }

TEST(InternalUtil, GlobFilesWithLeadingSlash) { TestGlobFiles("/"); }

////////////////////////////////////////////////////////////////////////////
// Generic MockFileSystem tests

Expand Down
7 changes: 7 additions & 0 deletions cpp/src/arrow/filesystem/path_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,13 @@ Status AssertNoTrailingSlash(std::string_view key) {
return Status::OK();
}

bool HasLeadingSlash(std::string_view key) {
if (key.front() != '/') {
return false;
}
return true;
}

Result<std::string> MakeAbstractPathRelative(const std::string& base,
const std::string& path) {
if (base.empty() || base.front() != kSep) {
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/filesystem/path_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ std::string_view RemoveTrailingSlash(std::string_view s);
ARROW_EXPORT
Status AssertNoTrailingSlash(std::string_view s);

ARROW_EXPORT
bool HasLeadingSlash(std::string_view s);

ARROW_EXPORT
bool IsAncestorOf(std::string_view ancestor, std::string_view descendant);

Expand Down
5 changes: 4 additions & 1 deletion cpp/src/arrow/filesystem/util_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ Result<FileInfoVector> GlobFiles(const std::shared_ptr<FileSystem>& filesystem,
FileInfoVector results{FileInfo("", FileType::Directory)};
// The exact tail that will later require matching with candidate entries
std::string current_tail;

auto is_leading_slash = HasLeadingSlash(glob);
auto split_glob = SplitAbstractPath(glob, '/');

// Process one depth level at once, from root to leaf
Expand All @@ -104,6 +104,9 @@ Result<FileInfoVector> GlobFiles(const std::shared_ptr<FileSystem>& filesystem,
selector.base_dir = current_tail.empty()
? res.path()
: ConcatAbstractPath(res.path(), current_tail);
if (is_leading_slash) {
selector.base_dir = EnsureLeadingSlash(selector.base_dir);
}
ARROW_ASSIGN_OR_RAISE(auto entries, filesystem->GetFileInfo(selector));
Globber globber(ConcatAbstractPath(selector.base_dir, glob_component));
for (auto&& entry : entries) {
Expand Down

0 comments on commit a1b161e

Please sign in to comment.