Skip to content

Commit

Permalink
Merge pull request #54145 from ClickHouse/backport/23.6/54122
Browse files Browse the repository at this point in the history
Backport #54122 to 23.6: Fix rows_before_limit_at_least for DelayedSource.
  • Loading branch information
KochetovNicolai committed Sep 1, 2023
2 parents 30e0abc + cc3b4fc commit 98eab9c
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 2 deletions.
6 changes: 6 additions & 0 deletions src/Processors/Sources/DelayedSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ void DelayedSource::work()

processors = Pipe::detachProcessors(std::move(pipe));

if (rows_before_limit)
{
for (auto & processor : processors)
processor->setRowsBeforeLimitCounter(rows_before_limit);
}

synchronizePorts(totals_output, totals, header, processors);
synchronizePorts(extremes_output, extremes, header, processors);
}
Expand Down
3 changes: 3 additions & 0 deletions src/Processors/Sources/DelayedSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,13 @@ class DelayedSource : public IProcessor
OutputPort * getTotalsPort() { return totals; }
OutputPort * getExtremesPort() { return extremes; }

void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) override { rows_before_limit.swap(counter); }

private:
QueryPlanResourceHolder resources;
Creator creator;
Processors processors;
RowsBeforeLimitCounterPtr rows_before_limit;

/// Outputs for DelayedSource.
OutputPort * main = nullptr;
Expand Down
5 changes: 3 additions & 2 deletions src/QueryPipeline/QueryPipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <Processors/Sinks/EmptySink.h>
#include <Processors/Sinks/NullSink.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Processors/Sources/DelayedSource.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Sources/RemoteSource.h>
#include <Processors/Sources/SourceFromChunks.h>
Expand Down Expand Up @@ -164,7 +165,7 @@ static void initRowsBeforeLimit(IOutputFormat * output_format)
/// 5. Limit ... : Set counter on the input port of Limit

/// Case 1.
if (typeid_cast<RemoteSource *>(processor) && !limit_processor)
if ((typeid_cast<RemoteSource *>(processor) || typeid_cast<DelayedSource *>(processor)) && !limit_processor)
{
processors.emplace_back(processor);
continue;
Expand Down Expand Up @@ -199,7 +200,7 @@ static void initRowsBeforeLimit(IOutputFormat * output_format)
}

/// Case 4.
if (typeid_cast<RemoteSource *>(processor))
if (typeid_cast<RemoteSource *>(processor) || typeid_cast<DelayedSource *>(processor))
{
processors.emplace_back(processor);
limit_candidates[limit_processor].push_back(limit_input_port);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,23 @@
3
3
1
{
"meta":
[
{
"name": "sum(a)",
"type": "Int64"
}
],

"data":
[
{
"sum(a)": "1"
}
],

"rows": 1,

"rows_before_limit_at_least": 2
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ SYSTEM ENABLE FAILPOINT use_delayed_remote_source;

SELECT sum(a) FROM remote('127.0.0.4', currentDatabase(), '02863_delayed_source') WITH TOTALS SETTINGS extremes = 1;
SELECT max(explain like '%Delayed%') FROM (EXPLAIN PIPELINE graph=1 SELECT sum(a) FROM remote('127.0.0.4', currentDatabase(), '02863_delayed_source') WITH TOTALS SETTINGS extremes = 1);
SELECT sum(a) FROM remote('127.0.0.4', currentDatabase(), '02863_delayed_source') GROUP BY a ORDER BY a LIMIT 1 FORMAT JSON settings output_format_write_statistics=0;

SYSTEM DISABLE FAILPOINT use_delayed_remote_source;

Expand Down

0 comments on commit 98eab9c

Please sign in to comment.