From 5424f235a92331c8ed2dba72b29de24d3ff1e8f7 Mon Sep 17 00:00:00 2001 From: Michael Kolupaev Date: Sat, 10 Feb 2024 00:27:15 +0000 Subject: [PATCH 1/2] Fix StorageURL doing some of the query execution in one thread instead of max_threads --- src/Storages/StorageURL.cpp | 6 ++++-- .../0_stateless/02723_parallelize_output_setting.reference | 4 ++++ .../0_stateless/02723_parallelize_output_setting.sql | 3 +++ 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/src/Storages/StorageURL.cpp b/src/Storages/StorageURL.cpp index 433f4ed77001..ce9b0cd366b1 100644 --- a/src/Storages/StorageURL.cpp +++ b/src/Storages/StorageURL.cpp @@ -904,6 +904,7 @@ class ReadFromURL : public SourceStepWithFilter , context(std::move(context_)) , max_block_size(max_block_size_) , num_streams(num_streams_) + , max_num_streams(num_streams_) { } @@ -920,6 +921,7 @@ class ReadFromURL : public SourceStepWithFilter size_t max_block_size; size_t num_streams; + const size_t max_num_streams; std::shared_ptr iterator_wrapper; bool is_url_with_globs = false; @@ -1093,8 +1095,8 @@ void ReadFromURL::initializePipeline(QueryPipelineBuilder & pipeline, const Buil auto pipe = Pipe::unitePipes(std::move(pipes)); size_t output_ports = pipe.numOutputPorts(); const bool parallelize_output = context->getSettingsRef().parallelize_output_from_storages; - if (parallelize_output && storage->parallelizeOutputAfterReading(context) && output_ports > 0 && output_ports < num_streams) - pipe.resize(num_streams); + if (parallelize_output && storage->parallelizeOutputAfterReading(context) && output_ports > 0 && output_ports < max_num_streams) + pipe.resize(max_num_streams); if (pipe.empty()) pipe = Pipe(std::make_shared(info.source_header)); diff --git a/tests/queries/0_stateless/02723_parallelize_output_setting.reference b/tests/queries/0_stateless/02723_parallelize_output_setting.reference index 0f2a396f471c..36e4e68ecd5b 100644 --- a/tests/queries/0_stateless/02723_parallelize_output_setting.reference +++ b/tests/queries/0_stateless/02723_parallelize_output_setting.reference @@ -5,3 +5,7 @@ select startsWith(trimLeft(explain),'Resize') as resize from (explain pipeline s -- no Resize in pipeline set parallelize_output_from_storages=0; select startsWith(trimLeft(explain),'Resize') as resize from (explain pipeline select * from file(data_02723.csv)) where resize; +-- Data from URL source is immediately resized to max_treads streams, before any ExpressionTransform. +set parallelize_output_from_storages=1; +select match(arrayStringConcat(groupArray(explain), ''), '.*Resize 1 → 2 *URL 0 → 1 *$') from (explain pipeline select x, count() from url('https://example.com', Parquet, 'x Int64') group by x order by count() limit 10); +1 diff --git a/tests/queries/0_stateless/02723_parallelize_output_setting.sql b/tests/queries/0_stateless/02723_parallelize_output_setting.sql index 7db28ca4decd..12786b80f69d 100644 --- a/tests/queries/0_stateless/02723_parallelize_output_setting.sql +++ b/tests/queries/0_stateless/02723_parallelize_output_setting.sql @@ -10,3 +10,6 @@ select startsWith(trimLeft(explain),'Resize') as resize from (explain pipeline s set parallelize_output_from_storages=0; select startsWith(trimLeft(explain),'Resize') as resize from (explain pipeline select * from file(data_02723.csv)) where resize; +-- Data from URL source is immediately resized to max_treads streams, before any ExpressionTransform. +set parallelize_output_from_storages=1; +select match(arrayStringConcat(groupArray(explain), ''), '.*Resize 1 → 2 *URL 0 → 1 *$') from (explain pipeline select x, count() from url('https://example.com', Parquet, 'x Int64') group by x order by count() limit 10); \ No newline at end of file From b5fd68a2b6b9c50accc62671f1b87ea4cd30785c Mon Sep 17 00:00:00 2001 From: Michael Kolupaev Date: Sat, 10 Feb 2024 03:40:55 +0000 Subject: [PATCH 2/2] no-fasttest --- tests/queries/0_stateless/02723_parallelize_output_setting.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/queries/0_stateless/02723_parallelize_output_setting.sql b/tests/queries/0_stateless/02723_parallelize_output_setting.sql index 12786b80f69d..86e6d4b4e3db 100644 --- a/tests/queries/0_stateless/02723_parallelize_output_setting.sql +++ b/tests/queries/0_stateless/02723_parallelize_output_setting.sql @@ -1,4 +1,4 @@ --- Tags: no-parallel +-- Tags: no-parallel, no-fasttest insert into function file(data_02723.csv) select number from numbers(5) settings engine_file_truncate_on_insert=1;