Skip to content

Commit

Permalink
Merge pull request #59833 from ClickHouse/ns
Browse files Browse the repository at this point in the history
Fix StorageURL doing some of the query execution in single thread
  • Loading branch information
alexey-milovidov committed Feb 11, 2024
2 parents 789bca1 + b5fd68a commit c18f713
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 3 deletions.
6 changes: 4 additions & 2 deletions src/Storages/StorageURL.cpp
Expand Up @@ -904,6 +904,7 @@ class ReadFromURL : public SourceStepWithFilter
, context(std::move(context_))
, max_block_size(max_block_size_)
, num_streams(num_streams_)
, max_num_streams(num_streams_)
{
}

Expand All @@ -920,6 +921,7 @@ class ReadFromURL : public SourceStepWithFilter

size_t max_block_size;
size_t num_streams;
const size_t max_num_streams;

std::shared_ptr<StorageURLSource::IteratorWrapper> iterator_wrapper;
bool is_url_with_globs = false;
Expand Down Expand Up @@ -1093,8 +1095,8 @@ void ReadFromURL::initializePipeline(QueryPipelineBuilder & pipeline, const Buil
auto pipe = Pipe::unitePipes(std::move(pipes));
size_t output_ports = pipe.numOutputPorts();
const bool parallelize_output = context->getSettingsRef().parallelize_output_from_storages;
if (parallelize_output && storage->parallelizeOutputAfterReading(context) && output_ports > 0 && output_ports < num_streams)
pipe.resize(num_streams);
if (parallelize_output && storage->parallelizeOutputAfterReading(context) && output_ports > 0 && output_ports < max_num_streams)
pipe.resize(max_num_streams);

if (pipe.empty())
pipe = Pipe(std::make_shared<NullSource>(info.source_header));
Expand Down
Expand Up @@ -5,3 +5,7 @@ select startsWith(trimLeft(explain),'Resize') as resize from (explain pipeline s
-- no Resize in pipeline
set parallelize_output_from_storages=0;
select startsWith(trimLeft(explain),'Resize') as resize from (explain pipeline select * from file(data_02723.csv)) where resize;
-- Data from URL source is immediately resized to max_treads streams, before any ExpressionTransform.
set parallelize_output_from_storages=1;
select match(arrayStringConcat(groupArray(explain), ''), '.*Resize 1 → 2 *URL 0 → 1 *$') from (explain pipeline select x, count() from url('https://example.com', Parquet, 'x Int64') group by x order by count() limit 10);
1
@@ -1,4 +1,4 @@
-- Tags: no-parallel
-- Tags: no-parallel, no-fasttest

insert into function file(data_02723.csv) select number from numbers(5) settings engine_file_truncate_on_insert=1;

Expand All @@ -10,3 +10,6 @@ select startsWith(trimLeft(explain),'Resize') as resize from (explain pipeline s
set parallelize_output_from_storages=0;
select startsWith(trimLeft(explain),'Resize') as resize from (explain pipeline select * from file(data_02723.csv)) where resize;

-- Data from URL source is immediately resized to max_treads streams, before any ExpressionTransform.
set parallelize_output_from_storages=1;
select match(arrayStringConcat(groupArray(explain), ''), '.*Resize 1 → 2 *URL 0 → 1 *$') from (explain pipeline select x, count() from url('https://example.com', Parquet, 'x Int64') group by x order by count() limit 10);

0 comments on commit c18f713

Please sign in to comment.