New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Global Aggregation over Kafka Stream #54870
base: master
Are you sure you want to change the base?
Global Aggregation over Kafka Stream #54870
Conversation
This is an automatic comment. The PR descriptions does not match the template. Please, edit it accordingly. The error is: Changelog entry required for category 'New Feature' |
1 similar comment
This is an automatic comment. The PR descriptions does not match the template. Please, edit it accordingly. The error is: Changelog entry required for category 'New Feature' |
This is an automated comment for commit 7710a92 with description of existing statuses. It's updated for the latest CI running ❌ Click here to open a full report in a separate page Successful checks
|
ab75a79
to
c36c139
Compare
aaa32a5
to
80d13be
Compare
This is an automatic comment. The PR descriptions does not match the template. Please, edit it accordingly. The error is: Changelog entry required for category 'New Feature' |
This is an automatic comment. The PR descriptions does not match the template. Please, edit it accordingly. The error is: Changelog entry required for category 'New Feature' |
No idea why docs check failed. @KochetovNicolai, may you help point out the missing part regarding the doc ? |
Ok, need add doc in the source code as well |
@@ -0,0 +1,1014 @@ | |||
#pragma once |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We choose to copy and revise the Aggregator because we like to avoid impacting the efficiency / performantce for historical data processing and also keep itself performant / efficient for streaming processing. And aggregator will have these major different tweaks / behaviors / revisions for now and moving forward.
- Keep around states after intermediate aggregation emit. (We have this in the current PR)
- Garbage collect aggregate states when watermark progress for streaming window process (in future PR)
- Interface changes -> Block => Chunk to make it more lightweight / efficient for streaming processing. (future PR)
- State checkpointing and restore. In streaming processing, when enable checkpoint, we will need checkpoint the in-memory state and restore it when the query gets recovered. (future PR)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few comments as I started to go through the code.
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "{}: the max interval kind supported is DAY.", msg); | ||
} | ||
} | ||
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "{}", msg); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please ha a specific message here. We have a test that checks that we have meaningful message patterns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will tackle and revise it.
b8bc02a
to
b9ea0ef
Compare
43f18b3
to
7e6ad75
Compare
41fc28f
to
7710a92
Compare
stateless test fail: 01459_manual_write_to_replicas_quorum |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general I like the changes (I briefly checked out GlobalAggregatingTransform and AggregatingTransform), however what is expected regarding to watermarks in unclear. Could you please write either a comment in the code or a comment here on GitHub about watermarks:
- What can we expect regarding them? They produced in a monotonic way, but can they be mixed/shuffled in the pipeline and if so, is that a problem?
- What extra guarantees or functionality do they give? Do they have any guarantees or just best effort?
- Maybe if you have in general any good documentation for Proton/article or something about a nice description of watermarks you have implemented, that is also very welcome.
@@ -63,6 +63,10 @@ struct TreeRewriterResult | |||
/// Note: not used further. | |||
NameToNameMap array_join_name_to_alias; | |||
|
|||
bool streaming = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would say this is a bit dangerous default value. Let's set it to false by default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. T+ Proton is streaming by default. ClickHouse shall be different.
if (settings.optimize_uniq_to_count) | ||
{ | ||
RewriteUniqToCountMatcher::Data data_rewrite_uniq_count; | ||
RewriteUniqToCountVisitor(data_rewrite_uniq_count).visit(query_ptr); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this a duplicate of lines 447-451?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, i think this one is a cherry pick (from lots of changes) and diff mistake. We will remove these lines.
/// Optional `STREAM` keyword | ||
ParserKeyword("STREAM").ignore(pos, expected); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we have STREAM
if we don't do anything with it? Is this some kind of comformance thing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is more like a conformance thing. We can remove it. https://arxiv.org/pdf/1905.12133.pdf
if (periodic_interval) | ||
throw Exception(ErrorCodes::SYNTAX_ERROR, "Can not use repeat 'PERIODIC' in EMIT clause"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How this can be true with a local variable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CherryPick partial commit caused this issue. https://github.com/timeplus-io/proton/blob/develop/src/Parsers/Streaming/ParserEmitQuery.cpp#L50-L60
We can revise this.
/// 'months' - 'M' | ||
/// 'quarter' - 'q' | ||
/// 'year' - 'y' | ||
class ParserIntervalAliasExpression : public IParserBase |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think before making this feature not experimental, we have to unify the parsing of these expressions and other interval expressions (e.g.: how TTL can be specified). But This is not a blocker to merge this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good idea. We shall have a consistent way to handle interval alias like 5s
vs INTERVAL 5 SECONDS
and there are other alias like JSON shortcuts like jsonstring:a.b.c::int
vs jsonExtract(jsonstring, "a.b.c") etc
@@ -275,6 +280,24 @@ ASTPtr StorageView::restoreViewName(ASTSelectQuery & select_query, const ASTPtr | |||
return subquery->children[0]; | |||
} | |||
|
|||
bool StorageView::isStreamingQuery(ContextPtr query_context) const |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@KochetovNicolai is it okay to depend on the new Analyzer in the storage layer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think it is ok.
We can support streaming with only a new analyzer if needed.
NONE, | ||
TAIL, | ||
PERIODIC |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you planning to introduce more emit modes? If not, then as far as I understood only PERIODIC
does anything, TAIL
and NONE
doesn't do anything, so it would be better to remove them. Furthermore, I think based on the InterpreterSelectQuery::buildStreamingProcessingQueryPlanAfterJoin
function we only use the watermarks in case of PERIODIC
queries, therefore WatermarkStamper won't be used with NONE
or TAIL
(if I am wrong, please explain how it can be used).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, we will introduce more emit modes like EMIT CHANGELOG, EMIT AFTER WATERMARK, EMIT AFTER WATERMARK AND DELAY 5s
. So more WatermarkStampers will chime in later PRs.
#include <Columns/IColumn.h> | ||
#include <unordered_map> | ||
|
||
namespace DB | ||
{ | ||
|
||
struct ChunkContext |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think in our codebase Context
is very overloaded, from ChunkContext
I associate to the Context
object which is used in different places as global/session/query context. Maybe ChunkWatermarkInfo
or something without Context
would be a better name.
This is a very minor thing, feel free to ignore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have considered similar thing in Proton code base as well, but as time goes, we stuff more things there like checkpoint context, so finally we decided using ChunkContext. Agree, context is overloaded.
|
||
bool updated = false; | ||
auto new_watermark = chunk.getWatermark(); | ||
if (new_watermark > input_with_data.watermark || (input_with_data.watermark == TIMEOUT_WATERMARK && new_watermark >= aligned_watermark)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How the new_watermark > input_with_data.watermark
can be false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we do alignment, watermarks can be from different data sources, so it is possible new_watermark can be from a fast input.
assert(num_inputs > 0); | ||
} | ||
|
||
IProcessor::Status ShrinkResizeProcessor::prepare(const PortNumbers & updated_inputs, const PortNumbers & /*updated_outputs*/) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need a bit of explanation about watermarks to understand this and updateAndAlignWatermark
functions.
- Is it expected that the different inputs has the same source of watermarks, so a single
WatermarkStamper
before anExpandResizeProcessor
? - Can data from different watermarks be mixed? E.g. we have a 5s periodic emit query. Can chunks before and after a water be sent to the output without sending the watermark? Isn't the goal of watermarks to keep the data and the outputted result "in sync"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need a bit of explanation about watermarks to understand this and
updateAndAlignWatermark
functions.
- Is it expected that the different inputs has the same source of watermarks, so a single
WatermarkStamper
before anExpandResizeProcessor
?
Different inputs have different source of watermarks (timestamps). Watermarks are stamped (observed) independently for each of time in the code. However, when we reach a Resize, Aggregate, Join operators, we will need align them (which fundamentally is trying to align the timestamps - what time it is now). After alignment, we established a timestamp for the the down stream query pipeline.
- Can data from different watermarks be mixed? E.g. we have a 5s periodic emit query. Can chunks before and after a water be sent to the output without sending the watermark? Isn't the goal of watermarks to keep the data and the outputted result "in sync"?
Watermark can be mixed and will need re-alignment when we hit a Resize etc operator like mentioned above. The sole goal of watermark is answering what time is it
and all of the aggr, join emit, state GC etc are based on the watermarks.
We will need do watermark alignment when mixed and shuffled. The current strategy is std::min(watermarks_from_inputs) of a processor. For idle input or slow input, we can choose either 1) wait which stalls the pipeline or 2) a heartbeat watermark which pushes the watermark progress. In this PR, we have
We will have some mechanism to handle late / out of order events like using
Watermarks which is timestamp fundamentally are one of the fundamental thing in streaming query processing / analytics. Proton borrowed lots of concepts from Flink but implemented in a very different way. There are quite a few academic papers which impact Proton's implementation. You can check this blog post regarding the details https://www.timeplus.com/post/unify-streaming-and-historical-data-processing Thanks Kovi for the review and comments again. Answers inlined here |
Changelog category (leave one):
Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):
EMIT STREAM
clause. Aggregation over Kafka table engine is also supported.Documentation entry for user-facing changes
Streaming aggregation over Kafka stream. This closes feature #54776 which contains some detail discussion regarding this feature.
Opt-in stream processing over Kafka storage engine via
EMIT STREAM
. We will need opt-in for backward compatibility with the existing behaviors of Kafka table engine.Examples :
Input formats which are supported by existing Kafka table engine keep as they are.