Skip to content

Commit

Permalink
Merge pull request #59545 from ClickHouse/system_zookeeper_stream
Browse files Browse the repository at this point in the history
Produce stream of chunks from system.zookeeper instead of accumulating the whole result
  • Loading branch information
davenger committed Feb 7, 2024
2 parents 40e10d1 + ca4f46a commit 4dcd806
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 25 deletions.
99 changes: 74 additions & 25 deletions src/Storages/System/StorageSystemZooKeeper.cpp
Expand Up @@ -180,7 +180,7 @@ using Paths = std::deque<std::pair<String, ZkPathType>>;
class ReadFromSystemZooKeeper final : public SourceStepWithFilter
{
public:
ReadFromSystemZooKeeper(const Block & header, SelectQueryInfo & query_info_, ContextPtr context_);
ReadFromSystemZooKeeper(const Block & header, SelectQueryInfo & query_info_, UInt64 max_block_size_, ContextPtr context_);

String getName() const override { return "ReadFromSystemZooKeeper"; }

Expand All @@ -189,13 +189,41 @@ class ReadFromSystemZooKeeper final : public SourceStepWithFilter
void applyFilters() override;

private:
void fillData(MutableColumns & res_columns);

std::shared_ptr<const StorageLimitsList> storage_limits;
const UInt64 max_block_size;
ContextPtr context;
Paths paths;
};


class SystemZooKeeperSource : public ISource
{
public:
SystemZooKeeperSource(
Paths && paths_,
Block header_,
UInt64 max_block_size_,
ContextPtr context_)
: ISource(header_)
, max_block_size(max_block_size_)
, paths(std::move(paths_))
, context(std::move(context_))
{
}

String getName() const override { return "SystemZooKeeper"; }

protected:
Chunk generate() override;

private:
const UInt64 max_block_size;
Paths paths;
ContextPtr context;
bool started = false;
};


StorageSystemZooKeeper::StorageSystemZooKeeper(const StorageID & table_id_)
: IStorage(table_id_)
{
Expand All @@ -211,11 +239,11 @@ void StorageSystemZooKeeper::read(
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum /*processed_stage*/,
size_t /*max_block_size*/,
size_t max_block_size,
size_t /*num_streams*/)
{
auto header = storage_snapshot->metadata->getSampleBlockWithVirtuals(getVirtuals());
auto read_step = std::make_unique<ReadFromSystemZooKeeper>(header, query_info, context);
auto read_step = std::make_unique<ReadFromSystemZooKeeper>(header, query_info, max_block_size, context);
query_plan.addStep(std::move(read_step));
}

Expand Down Expand Up @@ -414,7 +442,7 @@ static Paths extractPath(const ActionsDAG::NodeRawConstPtrs & filter_nodes, Cont
for (const auto * node : filter_nodes)
extractPathImpl(*node, res, context, allow_unrestricted);

if (filter_nodes.empty() && allow_unrestricted)
if (res.empty() && allow_unrestricted)
res.emplace_back("/", ZkPathType::Recurse);

return res;
Expand All @@ -426,8 +454,26 @@ void ReadFromSystemZooKeeper::applyFilters()
paths = extractPath(getFilterNodes().nodes, context, context->getSettingsRef().allow_unrestricted_reads_from_keeper);
}

void ReadFromSystemZooKeeper::fillData(MutableColumns & res_columns)

Chunk SystemZooKeeperSource::generate()
{
if (paths.empty())
{
if (!started)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"SELECT from system.zookeeper table must contain condition like path = 'path' "
"or path IN ('path1','path2'...) or path IN (subquery) "
"in WHERE clause unless `set allow_unrestricted_reads_from_keeper = 'true'`.");

/// No more work
return {};
}

started = true;

MutableColumns res_columns = getPort().getHeader().cloneEmptyColumns();
size_t row_count = 0;

QueryStatusPtr query_status = context->getProcessListElement();

const auto & settings = context->getSettingsRef();
Expand All @@ -453,12 +499,6 @@ void ReadFromSystemZooKeeper::fillData(MutableColumns & res_columns)
return zookeeper;
};

if (paths.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"SELECT from system.zookeeper table must contain condition like path = 'path' "
"or path IN ('path1','path2'...) or path IN (subquery) "
"in WHERE clause unless `set allow_unrestricted_reads_from_keeper = 'true'`.");

const Int64 max_inflight_requests = std::max<Int64>(1, context->getSettingsRef().max_download_threads.value);

struct ListTask
Expand All @@ -476,6 +516,16 @@ void ReadFromSystemZooKeeper::fillData(MutableColumns & res_columns)
if (query_status)
query_status->checkTimeLimit();

/// Check if the block is big enough already
if (max_block_size > 0 && row_count > 0)
{
size_t total_size = 0;
for (const auto & column : res_columns)
total_size += column->byteSize();
if (total_size > max_block_size)
break;
}

list_tasks.clear();
std::vector<String> paths_to_list;
while (!paths.empty() && static_cast<Int64>(list_tasks.size()) < max_inflight_requests)
Expand Down Expand Up @@ -519,8 +569,8 @@ void ReadFromSystemZooKeeper::fillData(MutableColumns & res_columns)
continue;

auto & task = list_tasks[list_task_idx];
if (auto elem = context->getProcessListElement())
elem->checkTimeLimit();
if (query_status)
query_status->checkTimeLimit();

Strings nodes = std::move(list_result.names);

Expand Down Expand Up @@ -557,8 +607,8 @@ void ReadFromSystemZooKeeper::fillData(MutableColumns & res_columns)

auto & get_task = get_tasks[i];
auto & list_task = list_tasks[get_task.list_task_idx];
if (auto elem = context->getProcessListElement())
elem->checkTimeLimit();
if (query_status)
query_status->checkTimeLimit();

// Deduplication
String key = list_task.path_part + '/' + get_task.node;
Expand All @@ -584,31 +634,30 @@ void ReadFromSystemZooKeeper::fillData(MutableColumns & res_columns)
res_columns[col_num++]->insert(
list_task.path); /// This is the original path. In order to process the request, condition in WHERE should be triggered.

++row_count;

if (list_task.path_type != ZkPathType::Exact && res.stat.numChildren > 0)
{
paths.emplace_back(key, ZkPathType::Recurse);
}
}
}

return Chunk(std::move(res_columns), row_count);
}

ReadFromSystemZooKeeper::ReadFromSystemZooKeeper(const Block & header, SelectQueryInfo & query_info, ContextPtr context_)
ReadFromSystemZooKeeper::ReadFromSystemZooKeeper(const Block & header, SelectQueryInfo & query_info, UInt64 max_block_size_, ContextPtr context_)
: SourceStepWithFilter({.header = header})
, storage_limits(query_info.storage_limits)
, max_block_size(max_block_size_)
, context(std::move(context_))
{
}

void ReadFromSystemZooKeeper::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
const auto & header = getOutputStream().header;
MutableColumns res_columns = header.cloneEmptyColumns();
fillData(res_columns);

UInt64 num_rows = res_columns.at(0)->size();
Chunk chunk(std::move(res_columns), num_rows);

auto source = std::make_shared<SourceFromSingleChunk>(header, std::move(chunk));
auto source = std::make_shared<SystemZooKeeperSource>(std::move(paths), header, max_block_size, context);
source->setStorageLimits(storage_limits);
processors.emplace_back(source);
pipeline.init(Pipe(std::move(source)));
Expand Down
@@ -0,0 +1,6 @@
1
1
1
1
1
1
17 changes: 17 additions & 0 deletions tests/queries/0_stateless/02976_system_zookeeper_filters.sql
@@ -0,0 +1,17 @@
-- Tags: zookeeper, no-parallel, no-fasttest, long

SET allow_unrestricted_reads_from_keeper = 'false';

SELECT count() > 0 FROM system.zookeeper; -- { serverError BAD_ARGUMENTS }
SELECT count() > 0 FROM system.zookeeper WHERE name LIKE '%_%'; -- { serverError BAD_ARGUMENTS }
SELECT count() > 0 FROM system.zookeeper WHERE value LIKE '%'; -- { serverError BAD_ARGUMENTS }
SELECT count() > 0 FROM system.zookeeper WHERE path LIKE '/%'; -- { serverError BAD_ARGUMENTS }
SELECT count() > 0 FROM system.zookeeper WHERE path = '/';

SET allow_unrestricted_reads_from_keeper = 'true';

SELECT count() > 0 FROM system.zookeeper;
SELECT count() > 0 FROM system.zookeeper WHERE name LIKE '%_%';
SELECT count() > 0 FROM system.zookeeper WHERE value LIKE '%';
SELECT count() > 0 FROM system.zookeeper WHERE path LIKE '/%';
SELECT count() > 0 FROM system.zookeeper WHERE path = '/';

0 comments on commit 4dcd806

Please sign in to comment.