Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for multi-directory globs #50559

Merged
merged 43 commits into from Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
1a361ef
works for file
zvonand Jun 5, 2023
ece5380
add simple stateless for file()
zvonand Jun 7, 2023
bfdb186
small docs udpate
zvonand Jun 7, 2023
2c97a94
fix hdfs + style update
zvonand Jun 10, 2023
aad7712
add existing test
zvonand Jun 11, 2023
427c5cb
fix integration test
zvonand Jun 11, 2023
eb9cdbc
fix File test being flaky
zvonand Jun 12, 2023
7d7bd5b
update comment describing workflow
zvonand Jun 12, 2023
3e6d393
remove debug cerr
zvonand Jun 12, 2023
cd1a391
cleanup HDFS
zvonand Jun 12, 2023
1d80130
fix style & black
zvonand Jun 13, 2023
2f572b7
Merge branch 'master' into zvonand-issue-49290
zvonand Jun 14, 2023
02cf8a1
Update comment StorageFile.cpp
zvonand Jun 15, 2023
9e3ddec
Merge branch 'master' of https://github.com/ClickHouse/ClickHouse int…
zvonand Jun 15, 2023
00a5df6
Merge branch 'master' into zvonand-issue-49290
zvonand Jun 16, 2023
1c10578
update to master
zvonand Jun 16, 2023
b25555d
update style
zvonand Jun 17, 2023
278b2ec
Merge branch 'master' into zvonand-issue-49290
zvonand Jun 17, 2023
b16c30e
upd for review
zvonand Jun 19, 2023
4e61f93
upd
zvonand Jun 19, 2023
20f8b0b
Merge branch 'master' into zvonand-issue-49290
zvonand Jun 20, 2023
765b4ce
added wrong path for hdfs test
zvonand Jun 22, 2023
18e7b02
Merge branch 'master' into zvonand-issue-49290
zvonand Jun 22, 2023
84bb170
fix black
zvonand Jun 22, 2023
0b6688f
Merge branch 'master' into zvonand-issue-49290
zvonand Jun 24, 2023
ae5e715
Merge branch 'master' into zvonand-issue-49290
zvonand Jun 24, 2023
d0fa110
Merge branch 'master' into zvonand-issue-49290
zvonand Jun 24, 2023
c054fc8
Merge branch 'master' into zvonand-issue-49290
zvonand Jun 26, 2023
ddd8f6b
Merge branch 'master' into zvonand-issue-49290
zvonand Jun 26, 2023
ae26ff2
Merge branch 'master' into zvonand-issue-49290
zvonand Jun 28, 2023
cc0210f
Merge branch 'master' into zvonand-issue-49290
zvonand Jul 6, 2023
3e8d024
Merge branch 'master' into zvonand-issue-49290
zvonand Jul 7, 2023
c030e9d
Merge branch 'master' into zvonand-issue-49290
zvonand Jul 10, 2023
664fca6
Merge branch 'master' into zvonand-issue-49290
zvonand Jul 10, 2023
a9ae26c
Merge branch 'master' into zvonand-issue-49290
zvonand Jul 11, 2023
22d8b35
Merge branch 'master' into zvonand-issue-49290
zvonand Jul 12, 2023
57c12c0
Merge branch 'master' into zvonand-issue-49290
zvonand Jul 13, 2023
871cce6
Merge branch 'master' into zvonand-issue-49290
zvonand Jul 13, 2023
375a793
Merge branch 'master' into zvonand-issue-49290
zvonand Jul 13, 2023
d597f3e
Merge branch 'master' into zvonand-issue-49290
zvonand Jul 13, 2023
dbdd156
Merge branch 'master' into zvonand-issue-49290
zvonand Jul 14, 2023
d339c22
Merge branch 'master' into zvonand-issue-49290
zvonand Jul 17, 2023
4884022
Merge branch 'master' into zvonand-issue-49290
alexey-milovidov Jul 17, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/en/sql-reference/table-functions/file.md
Expand Up @@ -134,7 +134,7 @@ Multiple path components can have globs. For being processed file must exist and

- `*` — Substitutes any number of any characters except `/` including empty string.
- `?` — Substitutes any single character.
- `{some_string,another_string,yet_another_one}` — Substitutes any of strings `'some_string', 'another_string', 'yet_another_one'`.
- `{some_string,another_string,yet_another_one}` — Substitutes any of strings `'some_string', 'another_string', 'yet_another_one'`, including `/`.
- `{N..M}` — Substitutes any number in range from N to M including both borders.
- `**` - Fetches all files inside the folder recursively.

Expand Down
2 changes: 1 addition & 1 deletion docs/ru/sql-reference/table-functions/file.md
Expand Up @@ -79,7 +79,7 @@ SELECT * FROM file('test.csv', 'CSV', 'column1 UInt32, column2 UInt32, column3 U

- `*` — заменяет любое количество любых символов кроме `/`, включая отсутствие символов.
- `?` — заменяет ровно один любой символ.
- `{some_string,another_string,yet_another_one}` — заменяет любую из строк `'some_string', 'another_string', 'yet_another_one'`.
- `{some_string,another_string,yet_another_one}` — заменяет любую из строк `'some_string', 'another_string', 'yet_another_one'`, причём строка может содержать `/`.
- `{N..M}` — заменяет любое число в интервале от `N` до `M` включительно (может содержать ведущие нули).

Конструкция с `{}` аналогична табличной функции [remote](remote.md).
Expand Down
126 changes: 119 additions & 7 deletions src/Storages/HDFS/StorageHDFS.cpp
Expand Up @@ -63,23 +63,135 @@ namespace ErrorCodes
}
namespace
{
/// Forward-declared to use in LSWithFoldedRegexpMatching w/o circular dependency.
std::vector<StorageHDFS::PathWithInfo> LSWithRegexpMatching(const String & path_for_ls,
const HDFSFSPtr & fs,
const String & for_match);

/*
* When `{...}` has any `/`s, it must be processed in a different way:
* Basically, a path with globs is processed by LSWithRegexpMatching. In case it detects multi-dir glob {.../..., .../...},
* LSWithFoldedRegexpMatching is in charge from now on.
* It works a bit different: it still recursively goes through subdirectories, but does not match every directory to glob.
* Instead, it goes many levels down (until the approximate max_depth is reached) and compares this multi-dir path to a glob.
* StorageFile.cpp has the same logic.
*/
std::vector<StorageHDFS::PathWithInfo> LSWithFoldedRegexpMatching(const String & path_for_ls,
const HDFSFSPtr & fs,
const String & processed_suffix,
const String & suffix_with_globs,
const String & current_glob,
re2::RE2 & matcher,
const size_t max_depth,
const size_t next_slash_after_glob_pos)
{
/// We don't need to go all the way in every directory if max_depth is reached
/// as it is upper limit of depth by simply counting `/`s in curly braces
if (!max_depth)
return {};

HDFSFileInfo ls;
ls.file_info = hdfsListDirectory(fs.get(), path_for_ls.data(), &ls.length);
if (ls.file_info == nullptr && errno != ENOENT) // NOLINT
{
// ignore file not found exception, keep throw other exception, libhdfs3 doesn't have function to get exception type, so use errno.
throw Exception(
ErrorCodes::ACCESS_DENIED, "Cannot list directory {}: {}", path_for_ls, String(hdfsGetLastError()));
}

std::vector<StorageHDFS::PathWithInfo> result;

if (!ls.file_info && ls.length > 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "file_info shouldn't be null");

for (int i = 0; i < ls.length; ++i)
{
const String full_path = String(ls.file_info[i].mName);
const size_t last_slash = full_path.rfind('/');
const String dir_or_file_name = full_path.substr(last_slash);
const bool is_directory = ls.file_info[i].mKind == 'D';

if (re2::RE2::FullMatch(processed_suffix + dir_or_file_name, matcher))
{
if (next_slash_after_glob_pos == std::string::npos)
{
// result.push_back(String(ls.file_info[i].mName));
// if (last_mod_times)
// (*last_mod_times)[result.back()] = ls.file_info[i].mLastMod;
result.emplace_back(
String(ls.file_info[i].mName),
StorageHDFS::PathInfo{ls.file_info[i].mLastMod, static_cast<size_t>(ls.file_info[i].mSize)});
}
else
{
std::vector<StorageHDFS::PathWithInfo> result_part = LSWithRegexpMatching(
fs::path(full_path) / "" , fs, suffix_with_globs.substr(next_slash_after_glob_pos));
std::move(result_part.begin(), result_part.end(), std::back_inserter(result));
}
}
else if (is_directory)
{
std::vector<StorageHDFS::PathWithInfo> result_part = LSWithFoldedRegexpMatching(
fs::path(full_path), fs, processed_suffix + dir_or_file_name,
suffix_with_globs, current_glob, matcher, max_depth - 1, next_slash_after_glob_pos);
std::move(result_part.begin(), result_part.end(), std::back_inserter(result));
}
}
return result;
}

/* Recursive directory listing with matched paths as a result.
* Have the same method in StorageFile.
*/
std::vector<StorageHDFS::PathWithInfo> LSWithRegexpMatching(const String & path_for_ls, const HDFSFSPtr & fs, const String & for_match)
std::vector<StorageHDFS::PathWithInfo> LSWithRegexpMatching(
const String & path_for_ls,
const HDFSFSPtr & fs,
const String & for_match)
{
const size_t first_glob = for_match.find_first_of("*?{");
const size_t first_glob_pos = for_match.find_first_of("*?{");
const bool has_glob = first_glob_pos != std::string::npos;

const size_t end_of_path_without_globs = for_match.substr(0, first_glob).rfind('/');
const size_t end_of_path_without_globs = for_match.substr(0, first_glob_pos).rfind('/');
const String suffix_with_globs = for_match.substr(end_of_path_without_globs); /// begin with '/'
const String prefix_without_globs = path_for_ls + for_match.substr(1, end_of_path_without_globs); /// ends with '/'

const size_t next_slash = suffix_with_globs.find('/', 1);
re2::RE2 matcher(makeRegexpPatternFromGlobs(suffix_with_globs.substr(0, next_slash)));
size_t slashes_in_glob = 0;
const size_t next_slash_after_glob_pos = [&]()
{
if (!has_glob)
return suffix_with_globs.find('/', 1);

size_t in_curly = 0;
for (std::string::const_iterator it = ++suffix_with_globs.begin(); it != suffix_with_globs.end(); it++)
{
if (*it == '{')
++in_curly;
else if (*it == '/')
{
if (in_curly)
++slashes_in_glob;
else
return size_t(std::distance(suffix_with_globs.begin(), it));
}
else if (*it == '}')
--in_curly;
}
return std::string::npos;
}();

const std::string current_glob = suffix_with_globs.substr(0, next_slash_after_glob_pos);

re2::RE2 matcher(makeRegexpPatternFromGlobs(current_glob));
if (!matcher.ok())
throw Exception(ErrorCodes::CANNOT_COMPILE_REGEXP,
"Cannot compile regex from glob ({}): {}", for_match, matcher.error());

if (slashes_in_glob)
{
return LSWithFoldedRegexpMatching(fs::path(prefix_without_globs), fs, "", suffix_with_globs, current_glob,
matcher, slashes_in_glob, next_slash_after_glob_pos);
}

HDFSFileInfo ls;
ls.file_info = hdfsListDirectory(fs.get(), prefix_without_globs.data(), &ls.length);
if (ls.file_info == nullptr && errno != ENOENT) // NOLINT
Expand All @@ -96,7 +208,7 @@ namespace
const String full_path = String(ls.file_info[i].mName);
const size_t last_slash = full_path.rfind('/');
const String file_name = full_path.substr(last_slash);
const bool looking_for_directory = next_slash != std::string::npos;
const bool looking_for_directory = next_slash_after_glob_pos != std::string::npos;
const bool is_directory = ls.file_info[i].mKind == 'D';
/// Condition with type of current file_info means what kind of path is it in current iteration of ls
if (!is_directory && !looking_for_directory)
Expand All @@ -110,7 +222,7 @@ namespace
{
if (re2::RE2::FullMatch(file_name, matcher))
{
std::vector<StorageHDFS::PathWithInfo> result_part = LSWithRegexpMatching(fs::path(full_path) / "", fs, suffix_with_globs.substr(next_slash));
std::vector<StorageHDFS::PathWithInfo> result_part = LSWithRegexpMatching(fs::path(full_path) / "", fs, suffix_with_globs.substr(next_slash_after_glob_pos));
/// Recursion depth is limited by pattern. '*' works only for depth = 1, for depth = 2 pattern path is '*/*'. So we do not need additional check.
std::move(result_part.begin(), result_part.end(), std::back_inserter(result));
}
Expand Down
114 changes: 104 additions & 10 deletions src/Storages/StorageFile.cpp
Expand Up @@ -92,6 +92,66 @@ namespace ErrorCodes
namespace
{

/// Forward-declare to use in listFilesWithFoldedRegexpMatchingImpl()
void listFilesWithRegexpMatchingImpl(
const std::string & path_for_ls,
const std::string & for_match,
size_t & total_bytes_to_read,
std::vector<std::string> & result,
bool recursive = false);

/*
* When `{...}` has any `/`s, it must be processed in a different way:
* Basically, a path with globs is processed by listFilesWithRegexpMatching. In case it detects multi-dir glob {.../..., .../...},
* listFilesWithFoldedRegexpMatching is in charge from now on.
zvonand marked this conversation as resolved.
Show resolved Hide resolved
* It works a bit different: it still recursively goes through subdirectories, but does not match every directory to glob.
* Instead, it goes many levels down (until the approximate max_depth is reached) and compares this multi-dir path to a glob.
* StorageHDFS.cpp has the same logic.
*/
void listFilesWithFoldedRegexpMatchingImpl(const std::string & path_for_ls,
const std::string & processed_suffix,
const std::string & suffix_with_globs,
const std::string & current_glob,
zvonand marked this conversation as resolved.
Show resolved Hide resolved
re2::RE2 & matcher,
size_t & total_bytes_to_read,
const size_t max_depth,
const size_t next_slash_after_glob_pos,
std::vector<std::string> & result)
{
if (!max_depth)
return;

const fs::directory_iterator end;
for (fs::directory_iterator it(path_for_ls); it != end; ++it)
{
const std::string full_path = it->path().string();
const size_t last_slash = full_path.rfind('/');
const String dir_or_file_name = full_path.substr(last_slash);

if (re2::RE2::FullMatch(processed_suffix + dir_or_file_name, matcher))
{
if (next_slash_after_glob_pos == std::string::npos)
{
total_bytes_to_read += it->file_size();
result.push_back(it->path().string());
}
else
{
listFilesWithRegexpMatchingImpl(fs::path(full_path) / "" ,
suffix_with_globs.substr(next_slash_after_glob_pos),
total_bytes_to_read, result);
}
}
else if (it->is_directory())
{
listFilesWithFoldedRegexpMatchingImpl(fs::path(full_path), processed_suffix + dir_or_file_name,
suffix_with_globs, current_glob, matcher,
total_bytes_to_read, max_depth - 1, next_slash_after_glob_pos, result);
}

}
}

/* Recursive directory listing with matched paths as a result.
* Have the same method in StorageHDFS.
*/
Expand All @@ -100,15 +160,42 @@ void listFilesWithRegexpMatchingImpl(
const std::string & for_match,
size_t & total_bytes_to_read,
std::vector<std::string> & result,
bool recursive = false)
bool recursive)
{
const size_t first_glob = for_match.find_first_of("*?{");
const size_t first_glob_pos = for_match.find_first_of("*?{");
const bool has_glob = first_glob_pos != std::string::npos;

const size_t end_of_path_without_globs = for_match.substr(0, first_glob).rfind('/');
const size_t end_of_path_without_globs = for_match.substr(0, first_glob_pos).rfind('/');
const std::string suffix_with_globs = for_match.substr(end_of_path_without_globs); /// begin with '/'

const size_t next_slash = suffix_with_globs.find('/', 1);
const std::string current_glob = suffix_with_globs.substr(0, next_slash);
/// slashes_in_glob counter is a upper-bound estimate of recursion depth
/// needed to process complex cases when `/` is included into glob, e.g. /pa{th1/a,th2/b}.csv
size_t slashes_in_glob = 0;
const size_t next_slash_after_glob_pos = [&]()
{
if (!has_glob)
return suffix_with_globs.find('/', 1);

size_t in_curly = 0;
for (std::string::const_iterator it = ++suffix_with_globs.begin(); it != suffix_with_globs.end(); it++)
{
if (*it == '{')
++in_curly;
else if (*it == '/')
{
if (in_curly)
++slashes_in_glob;
else
return size_t(std::distance(suffix_with_globs.begin(), it));
}
else if (*it == '}')
--in_curly;
}
return std::string::npos;
}();

const std::string current_glob = suffix_with_globs.substr(0, next_slash_after_glob_pos);

auto regexp = makeRegexpPatternFromGlobs(current_glob);

re2::RE2 matcher(regexp);
Expand All @@ -125,13 +212,22 @@ void listFilesWithRegexpMatchingImpl(
if (!fs::exists(prefix_without_globs))
return;

const bool looking_for_directory = next_slash_after_glob_pos != std::string::npos;

if (slashes_in_glob)
{
listFilesWithFoldedRegexpMatchingImpl(fs::path(prefix_without_globs), "", suffix_with_globs,
current_glob, matcher, total_bytes_to_read, slashes_in_glob,
next_slash_after_glob_pos, result);
return;
}

const fs::directory_iterator end;
for (fs::directory_iterator it(prefix_without_globs); it != end; ++it)
{
const std::string full_path = it->path().string();
const size_t last_slash = full_path.rfind('/');
const String file_name = full_path.substr(last_slash);
const bool looking_for_directory = next_slash != std::string::npos;

/// Condition is_directory means what kind of path is it in current iteration of ls
if (!it->is_directory() && !looking_for_directory)
Expand All @@ -147,14 +243,12 @@ void listFilesWithRegexpMatchingImpl(
if (recursive)
{
listFilesWithRegexpMatchingImpl(fs::path(full_path).append(it->path().string()) / "" ,
looking_for_directory ? suffix_with_globs.substr(next_slash) : current_glob ,
looking_for_directory ? suffix_with_globs.substr(next_slash_after_glob_pos) : current_glob ,
total_bytes_to_read, result, recursive);
}
else if (looking_for_directory && re2::RE2::FullMatch(file_name, matcher))
{
/// Recursion depth is limited by pattern. '*' works only for depth = 1, for depth = 2 pattern path is '*/*'. So we do not need additional check.
listFilesWithRegexpMatchingImpl(fs::path(full_path) / "", suffix_with_globs.substr(next_slash), total_bytes_to_read, result);
}
listFilesWithRegexpMatchingImpl(fs::path(full_path) / "", suffix_with_globs.substr(next_slash_after_glob_pos), total_bytes_to_read, result);
}
}
}
Expand Down
17 changes: 17 additions & 0 deletions tests/integration/test_storage_hdfs/test.py
Expand Up @@ -85,6 +85,23 @@ def test_read_write_storage_with_globs(started_cluster):
assert "in readonly mode" in str(ex)


def test_storage_with_multidirectory_glob(started_cluster):
hdfs_api = started_cluster.hdfs_api
for i in ["1", "2"]:
hdfs_api.write_data(
f"/multiglob/p{i}/path{i}/postfix/data{i}", f"File{i}\t{i}{i}\n"
)
assert (
hdfs_api.read_data(f"/multiglob/p{i}/path{i}/postfix/data{i}")
== f"File{i}\t{i}{i}\n"
)

r = node1.query(
"SELECT * FROM hdfs('hdfs://hdfs1:9000/multiglob/{p1/path1,p2/path2}/postfix/data{1,2}', TSV)"
)
assert (r == f"File1\t11\nFile2\t22\n") or (r == f"File2\t22\nFile1\t11\n")


def test_read_write_table(started_cluster):
hdfs_api = started_cluster.hdfs_api

Expand Down
@@ -0,0 +1,4 @@
This is file data1 data1.csv
This is file data2 data2.csv
This is file data1 data1.csv
This is file data2 data2.csv
@@ -0,0 +1,10 @@
-- Tags: no-replicated-database, no-parallel

INSERT INTO TABLE FUNCTION file('02771/dir1/subdir11/data1.csv', 'CSV', 's String') SELECT 'This is file data1' SETTINGS engine_file_truncate_on_insert=1;
INSERT INTO TABLE FUNCTION file('02771/dir2/subdir22/data2.csv', 'CSV', 's String') SELECT 'This is file data2' SETTINGS engine_file_truncate_on_insert=1;

SELECT *, _file FROM file('02771/dir{?/subdir?1/da,2/subdir2?/da}ta1.csv', CSV);
SELECT *, _file FROM file('02771/dir{?/subdir?1/da,2/subdir2?/da}ta2.csv', CSV);

SELECT *, _file FROM file('02771/dir?/{subdir?1/data1,subdir2?/data2}.csv', CSV) WHERE _file == 'data1.csv';
SELECT *, _file FROM file('02771/dir?/{subdir?1/data1,subdir2?/data2}.csv', CSV) WHERE _file == 'data2.csv';
zvonand marked this conversation as resolved.
Show resolved Hide resolved