Skip to content

Commit

Permalink
Merge pull request #50559 from zvonand/zvonand-issue-49290
Browse files Browse the repository at this point in the history
Add support for multi-directory globs
  • Loading branch information
robot-clickhouse-ci-1 committed Jul 19, 2023
2 parents 6c62c3b + 4884022 commit 32b765a
Show file tree
Hide file tree
Showing 7 changed files with 262 additions and 19 deletions.
2 changes: 1 addition & 1 deletion docs/en/sql-reference/table-functions/file.md
Expand Up @@ -134,7 +134,7 @@ Multiple path components can have globs. For being processed file must exist and

- `*` — Substitutes any number of any characters except `/` including empty string.
- `?` — Substitutes any single character.
- `{some_string,another_string,yet_another_one}` — Substitutes any of strings `'some_string', 'another_string', 'yet_another_one'`.
- `{some_string,another_string,yet_another_one}` — Substitutes any of strings `'some_string', 'another_string', 'yet_another_one'`, including `/`.
- `{N..M}` — Substitutes any number in range from N to M including both borders.
- `**` - Fetches all files inside the folder recursively.

Expand Down
2 changes: 1 addition & 1 deletion docs/ru/sql-reference/table-functions/file.md
Expand Up @@ -79,7 +79,7 @@ SELECT * FROM file('test.csv', 'CSV', 'column1 UInt32, column2 UInt32, column3 U

- `*` — заменяет любое количество любых символов кроме `/`, включая отсутствие символов.
- `?` — заменяет ровно один любой символ.
- `{some_string,another_string,yet_another_one}` — заменяет любую из строк `'some_string', 'another_string', 'yet_another_one'`.
- `{some_string,another_string,yet_another_one}` — заменяет любую из строк `'some_string', 'another_string', 'yet_another_one'`, причём строка может содержать `/`.
- `{N..M}` — заменяет любое число в интервале от `N` до `M` включительно (может содержать ведущие нули).

Конструкция с `{}` аналогична табличной функции [remote](remote.md).
Expand Down
122 changes: 115 additions & 7 deletions src/Storages/HDFS/StorageHDFS.cpp
Expand Up @@ -64,23 +64,131 @@ namespace ErrorCodes
}
namespace
{
/// Forward-declared to use in LSWithFoldedRegexpMatching w/o circular dependency.
std::vector<StorageHDFS::PathWithInfo> LSWithRegexpMatching(const String & path_for_ls,
const HDFSFSPtr & fs,
const String & for_match);

/*
* When `{...}` has any `/`s, it must be processed in a different way:
* Basically, a path with globs is processed by LSWithRegexpMatching. In case it detects multi-dir glob {.../..., .../...},
* LSWithFoldedRegexpMatching is in charge from now on.
* It works a bit different: it still recursively goes through subdirectories, but does not match every directory to glob.
* Instead, it goes many levels down (until the approximate max_depth is reached) and compares this multi-dir path to a glob.
* StorageFile.cpp has the same logic.
*/
std::vector<StorageHDFS::PathWithInfo> LSWithFoldedRegexpMatching(const String & path_for_ls,
const HDFSFSPtr & fs,
const String & processed_suffix,
const String & suffix_with_globs,
re2::RE2 & matcher,
const size_t max_depth,
const size_t next_slash_after_glob_pos)
{
/// We don't need to go all the way in every directory if max_depth is reached
/// as it is upper limit of depth by simply counting `/`s in curly braces
if (!max_depth)
return {};

HDFSFileInfo ls;
ls.file_info = hdfsListDirectory(fs.get(), path_for_ls.data(), &ls.length);
if (ls.file_info == nullptr && errno != ENOENT) // NOLINT
{
// ignore file not found exception, keep throw other exception, libhdfs3 doesn't have function to get exception type, so use errno.
throw Exception(
ErrorCodes::ACCESS_DENIED, "Cannot list directory {}: {}", path_for_ls, String(hdfsGetLastError()));
}

std::vector<StorageHDFS::PathWithInfo> result;

if (!ls.file_info && ls.length > 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "file_info shouldn't be null");

for (int i = 0; i < ls.length; ++i)
{
const String full_path = String(ls.file_info[i].mName);
const size_t last_slash = full_path.rfind('/');
const String dir_or_file_name = full_path.substr(last_slash);
const bool is_directory = ls.file_info[i].mKind == 'D';

if (re2::RE2::FullMatch(processed_suffix + dir_or_file_name, matcher))
{
if (next_slash_after_glob_pos == std::string::npos)
{
result.emplace_back(
String(ls.file_info[i].mName),
StorageHDFS::PathInfo{ls.file_info[i].mLastMod, static_cast<size_t>(ls.file_info[i].mSize)});
}
else
{
std::vector<StorageHDFS::PathWithInfo> result_part = LSWithRegexpMatching(
fs::path(full_path) / "" , fs, suffix_with_globs.substr(next_slash_after_glob_pos));
std::move(result_part.begin(), result_part.end(), std::back_inserter(result));
}
}
else if (is_directory)
{
std::vector<StorageHDFS::PathWithInfo> result_part = LSWithFoldedRegexpMatching(
fs::path(full_path), fs, processed_suffix + dir_or_file_name,
suffix_with_globs, matcher, max_depth - 1, next_slash_after_glob_pos);
std::move(result_part.begin(), result_part.end(), std::back_inserter(result));
}
}
return result;
}

/* Recursive directory listing with matched paths as a result.
* Have the same method in StorageFile.
*/
std::vector<StorageHDFS::PathWithInfo> LSWithRegexpMatching(const String & path_for_ls, const HDFSFSPtr & fs, const String & for_match)
std::vector<StorageHDFS::PathWithInfo> LSWithRegexpMatching(
const String & path_for_ls,
const HDFSFSPtr & fs,
const String & for_match)
{
const size_t first_glob = for_match.find_first_of("*?{");
const size_t first_glob_pos = for_match.find_first_of("*?{");
const bool has_glob = first_glob_pos != std::string::npos;

const size_t end_of_path_without_globs = for_match.substr(0, first_glob).rfind('/');
const size_t end_of_path_without_globs = for_match.substr(0, first_glob_pos).rfind('/');
const String suffix_with_globs = for_match.substr(end_of_path_without_globs); /// begin with '/'
const String prefix_without_globs = path_for_ls + for_match.substr(1, end_of_path_without_globs); /// ends with '/'

const size_t next_slash = suffix_with_globs.find('/', 1);
re2::RE2 matcher(makeRegexpPatternFromGlobs(suffix_with_globs.substr(0, next_slash)));
size_t slashes_in_glob = 0;
const size_t next_slash_after_glob_pos = [&]()
{
if (!has_glob)
return suffix_with_globs.find('/', 1);

size_t in_curly = 0;
for (std::string::const_iterator it = ++suffix_with_globs.begin(); it != suffix_with_globs.end(); it++)
{
if (*it == '{')
++in_curly;
else if (*it == '/')
{
if (in_curly)
++slashes_in_glob;
else
return size_t(std::distance(suffix_with_globs.begin(), it));
}
else if (*it == '}')
--in_curly;
}
return std::string::npos;
}();

const std::string current_glob = suffix_with_globs.substr(0, next_slash_after_glob_pos);

re2::RE2 matcher(makeRegexpPatternFromGlobs(current_glob));
if (!matcher.ok())
throw Exception(ErrorCodes::CANNOT_COMPILE_REGEXP,
"Cannot compile regex from glob ({}): {}", for_match, matcher.error());

if (slashes_in_glob)
{
return LSWithFoldedRegexpMatching(fs::path(prefix_without_globs), fs, "", suffix_with_globs,
matcher, slashes_in_glob, next_slash_after_glob_pos);
}

HDFSFileInfo ls;
ls.file_info = hdfsListDirectory(fs.get(), prefix_without_globs.data(), &ls.length);
if (ls.file_info == nullptr && errno != ENOENT) // NOLINT
Expand All @@ -97,7 +205,7 @@ namespace
const String full_path = String(ls.file_info[i].mName);
const size_t last_slash = full_path.rfind('/');
const String file_name = full_path.substr(last_slash);
const bool looking_for_directory = next_slash != std::string::npos;
const bool looking_for_directory = next_slash_after_glob_pos != std::string::npos;
const bool is_directory = ls.file_info[i].mKind == 'D';
/// Condition with type of current file_info means what kind of path is it in current iteration of ls
if (!is_directory && !looking_for_directory)
Expand All @@ -111,7 +219,7 @@ namespace
{
if (re2::RE2::FullMatch(file_name, matcher))
{
std::vector<StorageHDFS::PathWithInfo> result_part = LSWithRegexpMatching(fs::path(full_path) / "", fs, suffix_with_globs.substr(next_slash));
std::vector<StorageHDFS::PathWithInfo> result_part = LSWithRegexpMatching(fs::path(full_path) / "", fs, suffix_with_globs.substr(next_slash_after_glob_pos));
/// Recursion depth is limited by pattern. '*' works only for depth = 1, for depth = 2 pattern path is '*/*'. So we do not need additional check.
std::move(result_part.begin(), result_part.end(), std::back_inserter(result));
}
Expand Down
113 changes: 103 additions & 10 deletions src/Storages/StorageFile.cpp
Expand Up @@ -93,6 +93,65 @@ namespace ErrorCodes
namespace
{

/// Forward-declare to use in listFilesWithFoldedRegexpMatchingImpl()
void listFilesWithRegexpMatchingImpl(
const std::string & path_for_ls,
const std::string & for_match,
size_t & total_bytes_to_read,
std::vector<std::string> & result,
bool recursive = false);

/*
* When `{...}` has any `/`s, it must be processed in a different way:
* Basically, a path with globs is processed by listFilesWithRegexpMatchingImpl. In case it detects multi-dir glob {.../..., .../...},
* listFilesWithFoldedRegexpMatchingImpl is in charge from now on.
* It works a bit different: it still recursively goes through subdirectories, but does not match every directory to glob.
* Instead, it goes many levels down (until the approximate max_depth is reached) and compares this multi-dir path to a glob.
* StorageHDFS.cpp has the same logic.
*/
void listFilesWithFoldedRegexpMatchingImpl(const std::string & path_for_ls,
const std::string & processed_suffix,
const std::string & suffix_with_globs,
re2::RE2 & matcher,
size_t & total_bytes_to_read,
const size_t max_depth,
const size_t next_slash_after_glob_pos,
std::vector<std::string> & result)
{
if (!max_depth)
return;

const fs::directory_iterator end;
for (fs::directory_iterator it(path_for_ls); it != end; ++it)
{
const std::string full_path = it->path().string();
const size_t last_slash = full_path.rfind('/');
const String dir_or_file_name = full_path.substr(last_slash);

if (re2::RE2::FullMatch(processed_suffix + dir_or_file_name, matcher))
{
if (next_slash_after_glob_pos == std::string::npos)
{
total_bytes_to_read += it->file_size();
result.push_back(it->path().string());
}
else
{
listFilesWithRegexpMatchingImpl(fs::path(full_path) / "" ,
suffix_with_globs.substr(next_slash_after_glob_pos),
total_bytes_to_read, result);
}
}
else if (it->is_directory())
{
listFilesWithFoldedRegexpMatchingImpl(fs::path(full_path), processed_suffix + dir_or_file_name,
suffix_with_globs, matcher, total_bytes_to_read,
max_depth - 1, next_slash_after_glob_pos, result);
}

}
}

/* Recursive directory listing with matched paths as a result.
* Have the same method in StorageHDFS.
*/
Expand All @@ -101,15 +160,42 @@ void listFilesWithRegexpMatchingImpl(
const std::string & for_match,
size_t & total_bytes_to_read,
std::vector<std::string> & result,
bool recursive = false)
bool recursive)
{
const size_t first_glob = for_match.find_first_of("*?{");
const size_t first_glob_pos = for_match.find_first_of("*?{");
const bool has_glob = first_glob_pos != std::string::npos;

const size_t end_of_path_without_globs = for_match.substr(0, first_glob).rfind('/');
const size_t end_of_path_without_globs = for_match.substr(0, first_glob_pos).rfind('/');
const std::string suffix_with_globs = for_match.substr(end_of_path_without_globs); /// begin with '/'

const size_t next_slash = suffix_with_globs.find('/', 1);
const std::string current_glob = suffix_with_globs.substr(0, next_slash);
/// slashes_in_glob counter is a upper-bound estimate of recursion depth
/// needed to process complex cases when `/` is included into glob, e.g. /pa{th1/a,th2/b}.csv
size_t slashes_in_glob = 0;
const size_t next_slash_after_glob_pos = [&]()
{
if (!has_glob)
return suffix_with_globs.find('/', 1);

size_t in_curly = 0;
for (std::string::const_iterator it = ++suffix_with_globs.begin(); it != suffix_with_globs.end(); it++)
{
if (*it == '{')
++in_curly;
else if (*it == '/')
{
if (in_curly)
++slashes_in_glob;
else
return size_t(std::distance(suffix_with_globs.begin(), it));
}
else if (*it == '}')
--in_curly;
}
return std::string::npos;
}();

const std::string current_glob = suffix_with_globs.substr(0, next_slash_after_glob_pos);

auto regexp = makeRegexpPatternFromGlobs(current_glob);

re2::RE2 matcher(regexp);
Expand All @@ -126,13 +212,22 @@ void listFilesWithRegexpMatchingImpl(
if (!fs::exists(prefix_without_globs))
return;

const bool looking_for_directory = next_slash_after_glob_pos != std::string::npos;

if (slashes_in_glob)
{
listFilesWithFoldedRegexpMatchingImpl(fs::path(prefix_without_globs), "", suffix_with_globs,
matcher, total_bytes_to_read, slashes_in_glob,
next_slash_after_glob_pos, result);
return;
}

const fs::directory_iterator end;
for (fs::directory_iterator it(prefix_without_globs); it != end; ++it)
{
const std::string full_path = it->path().string();
const size_t last_slash = full_path.rfind('/');
const String file_name = full_path.substr(last_slash);
const bool looking_for_directory = next_slash != std::string::npos;

/// Condition is_directory means what kind of path is it in current iteration of ls
if (!it->is_directory() && !looking_for_directory)
Expand All @@ -148,14 +243,12 @@ void listFilesWithRegexpMatchingImpl(
if (recursive)
{
listFilesWithRegexpMatchingImpl(fs::path(full_path).append(it->path().string()) / "" ,
looking_for_directory ? suffix_with_globs.substr(next_slash) : current_glob ,
looking_for_directory ? suffix_with_globs.substr(next_slash_after_glob_pos) : current_glob ,
total_bytes_to_read, result, recursive);
}
else if (looking_for_directory && re2::RE2::FullMatch(file_name, matcher))
{
/// Recursion depth is limited by pattern. '*' works only for depth = 1, for depth = 2 pattern path is '*/*'. So we do not need additional check.
listFilesWithRegexpMatchingImpl(fs::path(full_path) / "", suffix_with_globs.substr(next_slash), total_bytes_to_read, result);
}
listFilesWithRegexpMatchingImpl(fs::path(full_path) / "", suffix_with_globs.substr(next_slash_after_glob_pos), total_bytes_to_read, result);
}
}
}
Expand Down
26 changes: 26 additions & 0 deletions tests/integration/test_storage_hdfs/test.py
Expand Up @@ -85,6 +85,32 @@ def test_read_write_storage_with_globs(started_cluster):
assert "in readonly mode" in str(ex)


def test_storage_with_multidirectory_glob(started_cluster):
hdfs_api = started_cluster.hdfs_api
for i in ["1", "2"]:
hdfs_api.write_data(
f"/multiglob/p{i}/path{i}/postfix/data{i}", f"File{i}\t{i}{i}\n"
)
assert (
hdfs_api.read_data(f"/multiglob/p{i}/path{i}/postfix/data{i}")
== f"File{i}\t{i}{i}\n"
)

r = node1.query(
"SELECT * FROM hdfs('hdfs://hdfs1:9000/multiglob/{p1/path1,p2/path2}/postfix/data{1,2}', TSV)"
)
assert (r == f"File1\t11\nFile2\t22\n") or (r == f"File2\t22\nFile1\t11\n")

try:
node1.query(
"SELECT * FROM hdfs('hdfs://hdfs1:9000/multiglob/{p4/path1,p2/path3}/postfix/data{1,2}.nonexist', TSV)"
)
assert False, "Exception have to be thrown"
except Exception as ex:
print(ex)
assert "no files" in str(ex)


def test_read_write_table(started_cluster):
hdfs_api = started_cluster.hdfs_api

Expand Down
@@ -0,0 +1,4 @@
This is file data1 data1.csv
This is file data2 data2.csv
This is file data1 data1.csv
This is file data2 data2.csv
@@ -0,0 +1,12 @@
-- Tags: no-replicated-database, no-parallel

SELECT *, _file FROM file('02771/dir{?/subdir?1/da,2/subdir2?/da}ta/non_existing.csv', CSV); -- {serverError CANNOT_EXTRACT_TABLE_STRUCTURE}

INSERT INTO TABLE FUNCTION file('02771/dir1/subdir11/data1.csv', 'CSV', 's String') SELECT 'This is file data1' SETTINGS engine_file_truncate_on_insert=1;
INSERT INTO TABLE FUNCTION file('02771/dir2/subdir22/data2.csv', 'CSV', 's String') SELECT 'This is file data2' SETTINGS engine_file_truncate_on_insert=1;

SELECT *, _file FROM file('02771/dir{?/subdir?1/da,2/subdir2?/da}ta1.csv', CSV);
SELECT *, _file FROM file('02771/dir{?/subdir?1/da,2/subdir2?/da}ta2.csv', CSV);

SELECT *, _file FROM file('02771/dir?/{subdir?1/data1,subdir2?/data2}.csv', CSV) WHERE _file == 'data1.csv';
SELECT *, _file FROM file('02771/dir?/{subdir?1/data1,subdir2?/data2}.csv', CSV) WHERE _file == 'data2.csv';

0 comments on commit 32b765a

Please sign in to comment.