Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Global Streaming Aggregation over infinite stream data in Kafka #54776

Open
chenziliang opened this issue Sep 19, 2023 · 4 comments
Open

Global Streaming Aggregation over infinite stream data in Kafka #54776

chenziliang opened this issue Sep 19, 2023 · 4 comments
Labels

Comments

@chenziliang
Copy link
Contributor

(you don't have to strictly follow this form)

Use case
Streaming Query / Aggregation over Kafka topic

A clear and concise description of what is the intended usage scenario is.

Describe the solution you'd like

This issue targets porting the first batch of the existing streaming processing functionalities from Timeplus streaming analytic open source repo Proton to the ClickHouse community.

It is also the first batch implementing Streaming Query RFC : allow aggregation and JOINing of infinite streams of data; which has quite different query behaviors than regular historical query since it emits (intermediate) query results according to “watermark”.

Syntax / Semantics

We can either leverage the existing ClickHouse Kafka table engine or introduce a separate external stream concept like Timeplus Proton does

If we like to introduce a new stream concept, the provisioning SQL will look something like this

CREATE EXTERNAL STREAM kafka_stream(raw String) SETTINGS type='kafka', brokers='localhost:9092', topic="github_events", ...

Otherwise we can keep using the existing ClickHouse Kafka table engine which may be problematic regarding offset checkpoint etc during query recovery.

No matter which storage engine we adopt, after provisioning the table which points to a Kafka topic. Users can just run different queries against this special table by using regular ClickHouse query syntax but with streaming processing semantics detailed below.

Flat Transformation Query

The query example below runs forever since the stream data lasts forever. It first rewinds the offset to earliest data to replay all of the available history and does json extract for each github event and emit the parsed fields to end users continuously.

SELECT raw::assignee, raw::user.login, raw::title, raw::created_at FROM kafka_stream SETTINGS seek_to='earliest';

Global Aggregation Query

The following query example continuously evaluates real-time data from the Kafka topic and emits the top 10 contributors every 5 seconds.

SELECT topK(10)(raw::user.login) as top_contributors  FROM kafka_stream EMIT periodic 5s ;

Data Enrichment Join

Like data enrichment join in Timeplus, it is an infinite stream joins a static historical table data.

In this query, users_dim is a regular ClickHouse (Replicated/Shared) MergeTree table. Once the users_dim is loaded and builds the hash table, it keeps static (no updates to the hash table in future for this case). The left stream events will join the right static hash table continuously as new events arrive on the left.

SELECT *, raw::user.login as user_id FROM kafka_stream INNER JOIN users_dim ON user_id = users_dim.id;

Global Aggregation after Data Enrichment Join

As in streaming global aggregation, users can run global aggregation over the streaming data enrichment join. The user_nt_name in the following query is from the users_dim dimension table.

SELECT topK(10)(user_nt_name) FROM 
  (
    SELECT *, raw::user.login as user_id FROM kafka_stream INNER JOIN users_dim ON user_id = users_dim.id
  )  AS enriched
EMIT periodic 5s;

Please note for all of the above example queries, we enable streaming query semantics automatically and only for the Kafka table engine (or external stream if we like to create a new concept). There are no opt-in tuning knobs to enable this streaming behavior since this is probably the most regular query semantics for the Kafka table engine.

Backward Compatibility

The above query syntax / semantics shall have no conflicts with existing Kafka table engine query behaviors, they are (just additional) enhancements.

A clear and concise description of what you want to happen.

Describe alternatives you've considered

A clear and concise description of any alternative solutions or features you've considered.

Additional context

Add any other context or screenshots about the feature request here.

@chenziliang
Copy link
Contributor Author

cc @KochetovNicolai

@KochetovNicolai
Copy link
Member

Syntax / Semantics

The concept of EXTERNAL STREAM looks good. I think we will need it eventually. However, it would be easier to start with the existing StorageKafka.

Flat Transformation Query

For the query example, it is interesting why a special syntax like raw::assignee is needed. What else can be used instead of row::?
Ideally, I prefer a streaming query syntax to be almost identical to normal query syntax, with a minimal number of changes. So that a user can replace SELECT ... FROM ... with SELECT ... FROM STREAM ... (or maybe just STREAM ... FROM) and get a streaming result.

Please note for all of the above example queries, we enable streaming query semantics automatically and only for the Kafka table engine (or external stream if we like to create a new concept).

^ This looks good to me.

Global Aggregation Query

Here, I see a new keyword: EMIT periodic 5s. Is it better than adding a list of specific settings for streaming queries?
I suppose it might be, e.g. if different parts of the query may require a different permit period. (Is there any example?)

@chenziliang
Copy link
Contributor Author

chenziliang commented Sep 19, 2023

Thanks @KochetovNicolai for your comments.

Syntax / Semantics

The concept of EXTERNAL STREAM looks good. I think we will need it eventually. However, it would be easier to start with the existing StorageKafka.

Sounds good. Let's stick to StorageKafka engine for now.

Flat Transformation Query

For the query example, it is interesting why a special syntax like raw::assignee is needed. What else can be used instead of row::? Ideally, I prefer a streaming query syntax to be almost identical to normal query syntax, with a minimal number of changes. So that a user can replace SELECT ... FROM ... with SELECT ... FROM STREAM ... (or maybe just STREAM ... FROM) and get a streaming result.

  1. raw::assignee is a syntax sugar to extract fields from json text instead of using json_extract(raw, ...) explicitly. The json path can be nested in several levels like raw::a.b.c to extract nested JSON field c. More details / usages are recorded in this doc.
  2. Looks good, for now, for StorageKafka query, let's avoid requiring users to specify STREAM keyword. We can extend the AST with STREAM later for other scenarios (other storage engines) in future.

Please note for all of the above example queries, we enable streaming query semantics automatically and only for the Kafka table engine (or external stream if we like to create a new concept).

^ This looks good to me.

Global Aggregation Query

Here, I see a new keyword: EMIT periodic 5s. Is it better than adding a list of specific settings for streaming queries? I suppose it might be, e.g. if different parts of the query may require a different permit period. (Is there any example?)

EMIT clause has other usage in Timeplus, like EMIT CHANGELOG, EMIT ON WATERMARK and EMIT ON WATERMARK WITH DELAY 2s for example. These are behavior modifier clause to control when we like to push the (intermediate / finalized) results to end users and / or with what kind of semantic like CHANGELOG or APPEND. Append is the default one. EMIT ON WATERMARK etc are used for window aggregation. Fundamentally, when window is closed, we will push the result to end users or delay a bit to honor late events (rows). And if we like to control subquery to emit at different pace, users can specify the EMIT clause for the subquery. You can find more details / examples regarding the EMIT clause here.

BTW, Timeplus's streaming query syntax / semantics are based on / derived from some of the academic researches / industry practices. You can find more information in this blog.

Using ClickHouse settings to control the behaviors will technically work as well, I think. Since Timeplus focuses on streaming first solution, in day one, we adopt the streaming industry practices.

@chenziliang
Copy link
Contributor Author

chenziliang commented Sep 21, 2023

Tackling this feature in PR #54870

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants