Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: klip-11: Redesign KSQL query language #3799

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

mjsax
Copy link
Member

@mjsax mjsax commented Nov 8, 2019

Author: Matthias J. Sax, @mjsax |
Release Target: 5.5+ |
Status: In Discussion |
Discussion: link to the design discussion PR

tl;dr: KLIP-8 adds the ability to query table state similar to RDBMS style queries. To allow querying tables in a "streaming manner", i.e., receive a table's changelog stream as query result, the new keyword EMIT CHANGES is introduced. However, adding this new keyword seems not to be an holistic solution. We propose to redesign the query language to allow a more native way to support different types of queries over streams and tables. The proposed design is based on the stream-table duality idea, and tries to embed this duality into the query language itself.

@mjsax mjsax requested a review from a team as a code owner November 8, 2019 02:02
@ghost
Copy link

ghost commented Nov 8, 2019

@confluentinc It looks like @mjsax just signed our Contributor License Agreement. 👍

Always at your service,

clabot

@purplefox
Copy link
Contributor

purplefox commented Nov 8, 2019

Thanks! Btw, I think it's easier to view the proposal like this https://github.com/confluentinc/ksql/blob/888691db3d43e508ee1d87095288166117b501e2/design-proposals/klip-11-DQL.md instead of copy and pasting into the PR description :)

@mjsax
Copy link
Member Author

mjsax commented Nov 8, 2019

I did what KLIP-10, did... I am new to writing KLIPs :)

Copy link
Contributor

@agavra agavra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mjsax! I think this brings up a lot of good points that we need to think about.

Something that I feel like is missing from this KLIP is an evaluation criteria on what we consider a successful outcome from this. We have lots of discussions around query semantics, and because they're so fundamental and accessible it often whirls out of control. To address this, we need some way to make trade-offs and make tough calls. I'm also very wary of optimizing for "future-proofness" when we need to make the language intuitive and understandable today.


The focus is to remove `EMIT CHANGES` and introduce new syntax to support different types of queries over streams and tables.
We want to exploit the stream-table duality and to transform STREAMS and TABLES into each other seamlessly.
Furthermore, the query language should express the difference between TABLES and MATERIALIZED VIEW explicitly.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think ksqlDB should differentiate between materialized views and tables. In an RDBMS this distinction is necessary to distinguish between tables that are created off a changelog topic and those that are not. In ksqlDB, I suspect that all tables will be backed by a changelog topic (we don't want to get into RDBMS land, that's a losing proposition). Why should we differentiate between a changelog topic that is being produced by a query and one that is just being inserted into manually by an application? If we want to implement a INSERT/UPDATE/DELETE we simply implement them as ad-hoc inserts into the changelog topic.

Beyond that, I think it would be confusing to most users starting out with KSQL to talk about Stream/Table duality and then not have the top level concept of a "Table". I can imagine the user experience of having all CREATE TABLE statements failing with "not yet implemented." This is the same frustration I'm getting with EMIT CHANGES today - we thought very hard about making it future proof at the expense of the experience today and I think we should avoid doing that again.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You raise a fair point, however, I might not have covered all aspects to this question. There are two ways atm, how a TABLE is create in ksqlDB:

  1. from a topic
  2. as the result of a CTAS statement

For case (1), allowing INSERT/UPDATE/DELETE might be something we could allow by writing into the topic -- however, we loose read-after-write consistency what seems to be a issue. For case(2) however, (from my point of view) it would just be wrong to allow INSERT/UPDATE/DELETE.

Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. For the case where it's created from a topic without a corresponding KSQL query, the only way to write into it is by writing into the underlying topic whether that's done via KSQL (INSERT statement) or by some other consumer. So we don't really change anything by exposing this, we're just allowing the user to interact with one fewer tool.
  2. For this case, I agree that at the surface level it feels very icky to insert into the underlying stream - but I don't think the model should prevent a user from doing this. I'd argue that if anything I strengthens the mental model. Every table is now just created on a topic and there's no distinguishing it whether it was created inside KSQL by a query or otherwise. A DML is a mix of a topic creation operation and a DDL to declare it as a table/stream. In my mental model, the "type 1" table is really just a table created over a topic that we don't know what's writing into it (a query unknown to KSQL).

Long story short, I don't see a difference between 1 and 2 - in both scenarios it's a table created from a topic.

Copy link
Member Author

@mjsax mjsax Nov 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a difference between both. I understand your point about (1). Not sure if your model is the best way to think about it (but I would like to exclude this question for now). However, point (2) is a different question that overlays some arguments from (1) with something else. Hence, let me try to split (2) into two pieces:

(2a) If we aggregate a stream, the aggregation operator needs state for the computation. In systems like Flink with no TABLE abstraction, this state is an internal implementation details and the result of the aggregation is a stream. In Kafka Streams, the internal state is promoted to be a first class citizen: the result of the computation is a TABLE.

(2b) However, this "promotion" of the internal state as TABLE, can be seen as an optimization: Instead of maintaining internal state and a result TABLE, both are "merged" together as we know that both contain the same data. Hecne, a stream-aggregation that returns a table can be decomposed into two steps: first, a Flink-like aggregation operator with internal state that emits and result stream, and second, an "upsert" operation that populates the result TABLE.

Hence, for point (2) we need to conceptually distinguish between the operator state and the result TABLE. I would argue, that it should not be allowed to modify the internal state of an aggregation operator. Because, the internal operator state and the result table are merged, we cannot allow to INSERT/UPDATE/DELETE into the result table. Otherwise, we break he aggregation operator.

If I understand you argument correctly, you are saying, we should consider internal operator state and result TABLE independent. And thus, we can allow inserting into the result TABLE. Thus, my conclusion would be, if we want to allow this, we need to split the operator state and the result TABLE into two entities. The aggregation maintains its internal state and each result record is treated as an UPDATE to the result TABLE. Because state and table are independent now, we can also allow INSERT/UPDATE/DELETE record from "the side".

Is this a fair way to put it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To finally address (1). I prefer the view (pun intended) that CREATE TABLE FROM <topic> creates a MV that one should not be allowed to INSERT/UPDATE/DELETE because ksqlDB does not "own" the topic.

  • an existing topic is written from "external", hence, ksqlDB is a consumer of this topic (not a producer)
  • if we introduce RDBMS style tables (that would also be backed by a topic) the underlying topic would be locked down for external writes, ie, all modifications must be done via ksqlDB -- ksqlDB would be the "owner" of the topic and the only producer

An alternative might be, to use ACLs for this case and say, if an external topic is locked down for write, we also disallow INSERT/UPDATE/DELETE -- but to me, this seems to fall short. IMHO, it's "cleaner" to only create MV from topic and hence, the ACL question does not arise to begin with.

To be fair, I have limited data points on this question. Maybe you know more? If people request this feature, what is their use case and can we make it work in a sane way? Beside the fact of the missing read-after-write consistency (that is a minor concern), I am still unclear why people want to be able to insert/modify "external" data (or do people actually not realize what they ask for, and they actually think, the TABLE is their "private" copy of the data?).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that explanation makes sense - thanks for the time @mjsax. I think #3954 also reasons out to a similar conclusion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could allow by writing into the topic -- however, we loose read-after-write consistency what seems to be a issue.

IMO we should first have this discussion on what sort of guarantees we would want for the writes from the application. Not sure if we have thought that far ahead already.

Hence, for point (2) we need to conceptually distinguish between the operator state and the result TABLE.
Thus, my conclusion would be, if we want to allow this, we need to split the operator state and the result TABLE into two entities

I understand what you are saying.. but what exactly will break the operator? Thinking out, the difference between the result table schema and the operator state's schema? e.g if you did a select a,b,c from table t group by k having .... Result table has just 3 columns a,b,c, but operator state can have additional columns for e.g?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also FYI - about allowing INSERT/UPDATE/DELETE it gets a little tricky (see #3585).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO we should first have this discussion on what sort of guarantees we would want for the writes from the application.

That is fair. However, I am not sure if it must be part of this KLIP?

I understand what you are saying.. but what exactly will break the operator?

It's not about the schema. If I do select count(*) from stream group by key and I have <k1,...> <k1,... <k2, ...> the result must be a table with two row <k1,2> and <k2,1> -- if I insert and change the value k2 to 5, the result table is incorrect now. Hence, we should not allow it. The output (that is a MV) must reflect the result of the computation. The relationship of the base stream and the derived output table is defined by the query and must be obeyed. It's the same thing in an RDBMS: if you define a MV over a table, you cannot insert into the MV -- and the relationship between the base table and the MV is define by the view definition (and a view definition is expressed as a query). We allow to define MV over input streams and input table via "persistent queries" but I don't really think there is "persistent query" as a standalone entity. A CTAS statement is nothing else but a view definition IMHO.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm with @mjsax here. We shouldn't be allowing INSERT/UPDATE/DELETE into the source and sink topics that KSQL uses, (Which is how INSERT VALUES is currently implemented), because it won't do what users expect.

However, I think this conversation would be better placed in the more targeted issue: #3954


We propose the following language changes:

* introduce `TABLE(<stream>)` operator: takes a STREAM as input, and changes its semantic interpretation from facts to updates; it returns a TABLE
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 - I think a lot of users have been asking for this type of functionality, and have worked around it by declaring a table on the underlying topic.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You introduce both TABLE() which creates a materialized view and CREATE MATERIALIZED VIEW that also creates a materialized view? I thought you define table to be something else and replace current table with MV?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a fair point @vpapavas -- technically, the operator should be called MATERIALIZE_VIEW(<stream>).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did get confused by this myself when I was first reading through. However, @mjsax saying both a TABLE and MATERIALIZED VIEW can be treated the same when it comes to querying them, so there isn't any difference in that sense.

While I got confused briefly, (it's easily done), I think this was because I was reasoning about the internals, rather than just using the syntax. I actually think table() works syntactically, even if its not fully accurate.

Would it always be the case that the stream passed to TABLE() would be materialized in some way? If not, then switching to MATERIALIZED_VIEW might be misleading.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point; it's up to the optimizer to materialize or not. However, even if materialized, reading/writing from "outside of the query" would not be possible because the operator creates a TEMP TABLE and thus the table does not have an name that is exposed to the user.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we had a UDAF called LATEST(), we could create a table from a stream through a LATEST aggregation (cf. #3985). How would this be different from the TABLE() operator?

(In an older, previous discussion the use of such a TABLE() operator, implicitly or explicitly, was discouraged by many participants, who favored the approach of an aggregation via LATEST.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am frankly going back and forth myself about this question. While I did favor LATEST() at some point, I am not sure any longer if an aggregation is the right abstraction, because I think an aggregation does not model deletes properly. An aggregation first builds group of records (in our case, substreams) and applied the aggregation function per group/substream. However, a delete would imply that the whole group should be removed. This seems to "break" the semantic model of an aggregation?

design-proposals/klip-11-DQL.md Show resolved Hide resolved
* introduce `STREAM(<table>)` operator: takes a TABLE as input, and returns a STREAM that contains a record for each update to the TABLE
* note: the returned STREAM is a fact stream, and would be processed with corresponding STREAM semantics (it's semantically not a changelog stream)
* note: the input TABLE can either be an RDBMS style table of a MV
* by default, the output STREAM contains one record for each input table record (full table scan on the table generation when the query was issues)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a "bootstrapping" operation or is it just a re-interpretation of the existing topic? It probably needs to be bootstrapping operation to make sure that it's non-compacted 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends how the table is store (but this seems to be an implementation details). If the table is not materialized, ie, CREATE TABLE FROM <topic> that yes, the table would need to be bootstrapped from the changelog, and for each unique key, exactly one output record goes into the stream (hence, it's not a re-interpretation of the "input changelog" but we do a "in-memory compaction on the fly"). Only afterward, each update would be emitted.

If the table is actually materialized already, ie, the result of a CTAS, we don't need to bootstrap it, but we do a full table scan, emit one record per key, and afterwards emit one record per update.

Does this make sense?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think my question wasn't worded properly. What I was trying to understand is if STREAM(<table>) always creates a new stream or can simply re-use the old stream. It seems that in all of your examples it needs to create a new stream and I think that makes sense, but is unfortunate from a storage perspective.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conceptually, it's a new STREAM.

Your comment about "storage perspective" seems to be important but should not impact the design of the language IMHO. We need of course figure out how we can implement this efficiently and "share" data if we need to materialize those STREAMs in topics. However, I would assume that the created STREAM is often not materialized, but that STREAM<table> is used to express nested queries.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that the storage should not impact the language, but what I was trying to confirm is if it's something conceptually different than a changelog topic - which it is. This means that a subscription on a table, and a subscription on a STREAM(<table>) are two different things.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear here, you're proposing that STREAM(someTable) will return a stream containing one row for each existing row in the table, followed by one row per update to the table?

Correct.

How would tombstones be handled? e.g. if a row is deleted from the table it will emit a record with a null value. Currently, KSQL will handle this OKish: I think it treats a null value in a stream the same as a value where every column is null. However, if / when we have the concept of NOT NULL we may run into issues.

I am aware of this issue, and I agree that the current approach how KSQL handles nulls/tombstones needs to be improved. I don't have final thoughts on it as I did not spent time thinking about it in detail. This is an existing problem though and thus somewhat orthogonal to the KLIP from my point of view. I also want to point out, that SQL itself has its own way to define NULL semantics (and those not always intuitive) and if we want to be compliant to ANSI SQL, we should make sure we follow those rules (hence, the current all values are NULL == tombstone seems to be wrong).

How we handle null/tombstones in ksqlDB as it's a non-standard "feature" is an open question to me and I think we need to design this from scratch. It seem, we never had a deep discussion on this issue -- also, there is a difference between streams and table here as streams don't have delete semantics, but might contain <key,null> records (as kv-pairs). With the "generic key" support that we build, this question becomes even more interesting. Also table(<stream>) should handle this case in a sane manner, and I think that an input streams records <key,null> and <key,>should be two different records (the later would be a row with a NULL value, while the former would be delete/tombstone if the stream is upserted viatable()`). This are only raw thoughts about some questions we need to address to tackle this issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(hence, the current all values are NULL == tombstone seems to be wrong)

That's not how it works... as I'm know you're aware a tombstone is a record with a null value. If converted to a stream KSQL would currently deserialize that as a row with null column values.

One thought is that maybe stream(table) would return a stream of events with a schema that explicitly exposed the CDC nature of the stream.

Given a table schema of:

   k0 INT PRIMARY KEY,  -- primary key, implicitly NON NULL
   v0 INT NON NULL,     -- non null value column
   v1 STRING            -- nullable value column

The schema returned by stream() would be:

   k0 INT PRIMARY KEY,  -- key column, (streams don't have primary keys), implicitly NON NULL
   v0 INT,              -- value column now nullable
   v1 STRING,
   TOMBSTONE BOOLEAN,   -- true if record is tombstone, all other value fields are null.
   UNDO BOOLEAN         -- true if undo, value columns have updated values.

Specifically,

  • all NON NULL value columns are converted to nullable.
  • An additional tombstone column is present, which is set to true on a delete, (all value column are null)
  • An additional undo column is present, to indicate the output is due to an undo operation, rather than a straight update.

Alternatively, the tombstone record could include all the value column values that are being deleted, thereby allowing value columns to remain NON NULL. (Not sure how possible this is with KS).

Importantly, defining such details of how stream() works needs, IMHO, to be covered in this klip.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by

k0 INT PRIMARY KEY,  -- key column, (streams don't have primary keys), implicitly NON NULL

The PRIMARY KEY definition and your comment "streams don't have primary keys" don't seem to align?

Why do you want to convert NOT NULL columns to nullable columns? (For my own education, to what extend is NOT NULL actually supported in KSQL atm? For a CREATE TABLE from <topic> would records that violate a NOT NULL definition simply be skipped? What about CTAS if the query return a record that violates the definition?)

Can you elaborate on undo? It's unclear to me what this is? What is an undo operation ?

The other question is about table(<stream>) operator: assume a user creates a STREAM that it upserts into a MV. How could a user insert a tombstone into the STREAM to delete from the MV? Does the the schema you propose above imply that a delete can only be inserted if there is tombstone column in the input STREAM of the table() operator?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be the difference between STREAM(myTable) and SELECT * FROM myTable EMIT CHANGES?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They would both be the same given the current proposal how SELECT * FROM myTable EMIT CHANGES should work (note, that the current implementation -- afaik-- does not implement yet what KLIP-8 proposes).

design-proposals/klip-11-DQL.md Show resolved Hide resolved
design-proposals/klip-11-DQL.md Show resolved Hide resolved
Comment on lines +164 to +166
* Drop the terms `pull query` and `push query`
* Use the terms `table query` and `stream query`
(this will also allow to use the phrase "query a stream" and "query a table" in a straight forward way)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I agree with this. I think the distinction of "subscribing" vs. "querying" makes a lot of sense independent of the input type of the query. I can see both a subscriptions and queries on tables (less so on streams), which means that the table/stream query distinction breaks down.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would phrase it differently: If you have a table, you can either query its state, or you subscribe to its changelog stream. The former is "querying", while the later is a "subscribing". However, you don't subscribe to the TABLE, but to its changelog, ie, you first create a stream, via STREAM(<table>) and then subscribe to it.

For stream, you can only subscribe to them.

Does this make sense?

Copy link
Contributor

@agavra agavra Nov 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can be convinced - I think what you're saying makes sense that you don't "subscribe to a table" but rather to the changelog of it.

I'm just considering a "push query" (indicated by something like EMIT ...) would do the cast from a table to a changelog stream implicitly for you. That way you don't run into the concern I raised here: #3799 (comment)

This would be materially different from STREAM(<table>) because it could reuse the underlying topic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I understand you last point. Can you elaborate? What does EMIT allow you to do, that you cannot do via STREAM(table) ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I brought up a similar aspect during the EMIT CHANGES discussion to just keep one mode per entity (stream/table). I think the reasoning to push back was - we already support subscribing to a table. An extra conversion to STREAM(table) will simplify the language . +1

Copy link
Contributor

@miguno miguno Dec 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, you don't subscribe to the TABLE, but to its changelog, ie, you first create a stream, via STREAM(<table>) and then subscribe to it.

Temporal tables, a standard SQL feature, blur the lines between a 'current value only' TABLE (which is what @mjsax refers to here) and what the history or changelog of that table is.

I think we should re-evaluate KLIP-11 in a way that aligns more with the existing, non-streaming SQL standard in this context.

I am with @agavra: The query behavior of 'push vs. pull' is (and should be) orthogonal to whether a 'stream' or a 'table' is being queried.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why temporal tables would be a problem? You just just refer to an older version of the table via AS OF <timestamp> clause that is mentioned in the KLIP. If you want to get the table starting from yesterday and stream it's changelog since then, you do STREAM(table AS OF <timestamp>) or the full syntax:

SELECT * FROM STREAM(SELECT * FROM table AS OF <timestamp>)

Btw: I had a couple of in person discussion and (I think) I gained a better understanding what "push and pull" is supposed to mean (what is quite unclear from KLIP-8 or from the current docs unfortunately)... I need to think about the problem again, and it might change this discussion...


```sql
[CREATE MATERIALIZED VIEW resultView AS] SELECT count(*) FROM inputStream GROUP BY <some-grouping-condition>
-- returns a STREAM (!!!): different and not different to current KSQL semantics... compare below
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

intuitively, this makes a lot of sense to me - practically, I think it's a little murky. To perform an aggregation it is necessary to materialize state in some form of table. How can I access that table? If I need to re-declare it over the changelog topic we need to have a really smart optimizer to know to leverage the existing one - and this metadata might be lost when I move between KSQL clusters.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To perform an aggregation it is necessary to materialize state in some form of table. How can I access that table?

You can't because is a TEMP TABLE -- RDBMS systems have the same concept.

If I need to re-declare it over the changelog topic we need to have a really smart optimizer to know to leverage the existing one

Yes, but this seems to be standard query optimization.

and this metadata might be lost when I move between KSQL clusters.

Cannot follow here? Can you elaborate?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, it's a temp table, (or hidden state store).

If I need to re-declare it over the changelog topic we need to have a really smart optimizer to know to leverage the existing one

Do you mean if someone executed the same statement again, e.g: issuing:

[CREATE MATERIALIZED VIEW resultView AS] SELECT count(*) FROM inputStream GROUP BY <some-grouping-condition>

multiple times? (with different view name)

If its a transient query then each instance would need its own temp table, as each would be streaming back the changes as the aggregate count increased.

If it's an MV, then initially we'd probably create a duplicate copy, but it wouldn't be two hard to detect if the query is exactly the same and to make the second view an alias of the first, (with reference counting obvs).

Or do you mean something else?

and this metadata might be lost when I move between KSQL clusters.

Yep, I'm lost here too...

The only difference is, that a transient query might actual terminate if a corresponding ROWTIME/OFFSET predicate is specified.


#### Query "old" Table State:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 these semantics are clear and easy to understand

@agavra agavra requested review from a team and agavra November 21, 2019 19:35
@vpapavas
Copy link
Member

Hey @mjsax ! Great write up!

A high level comment/question on the introduction of materialized views and why we need what you call table.

If I understand correctly, what you define as a MV is what KSQL currently has and calls a table. It is a set derived from a topic/stream/table/ that receives updates. It is backed by a state store and allows one to issue pull and push queries. One can derive a table from another table and generally create a chain of updates propagating from topic to stream to table1 ... tableN.

I agree that this is more reminiscent of a materialized view than a table in RDBMS world.

Now, what you define as a table is a stand-alone copy of an already existing state store (or blank new state store or empty topic). If created from something already existing, was the initial state store created as part of an MV or a Table? I.e. can you create tables only from tables or also MVs? Moreover, this new table does not receive any updates but one can explicitly do insert/update/delete . Can one create a MV from it?

My questions/concerns:

  1. What is the usage of this table you suggest? What are the extra semantics it introduces that are currently not part of Ksql's existing table? Why do we need them? I think the answer here is the knob to stop receiving updates.
  2. If you can create a table from another table, this already goes into MV land. Or is it just creating copies of tables?
  3. I understand you want to introduce something reminiscent to RDBMS tables. But in RDBMS, one can create materialized views from tables which you don't allow.

The only difference I see between a Table and a MV is that the former can be updated only explicitly. Then, the updates are propagated to other tables as it is with MVs. So, this begs the question: The semantics you want to achieve is that you want a structure that does not receive updates from a topic/table/stream but rather is a standalone entity.

If so, these are the options:

  1. Start with topic. Create a stream from it (gets updated). Create a table from it (does not get updated). Create an MV from it (gets updated).
  2. Start with a stream. Create a stream from it (gets updated automatically). Create a table from it (does not get updated automatically). Create an MV from it (gets updated automatically).
  3. Start with a table. Create a stream from it (gets updated automatically). Create a table from it (does not get updated automatically). Create an MV from it (gets updated automatically).
  4. Start with a MV. Create a stream from it (gets updated automatically). Create a table from it (does not get updated automatically). Create an MV from it (gets updated automatically).

What are the semantic for each of these if all these are allowed?


Semantic changes:

* An aggregation query over a stream returns a stream (and not a table)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this? The difference between tables and streams is the deduplication/update semantics. What do you propose here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I propose that the output stream contains a record for every update the "result table" (you can think of the result table as a TEMP TABLE in an RDBMS for this case, ie, the TABLE becomes an implementation detail).

The proposal is "necessary" to align to the overall design principles:

  • a stream query has an input stream and an output stream
  • it also allows for a straight forward support for "transient/client" queries. If the result would be a TABLE, a transient query would not return anything, even if a user rewrite the query to STREAM(SELECT * FROM stream....)

A more detailed discussion about this behavior is at the end of the KLIP: https://github.com/confluentinc/ksql/blob/888691db3d43e508ee1d87095288166117b501e2/design-proposals/klip-11-DQL.md#advanced-query-patterns

Hope this answers the question.


CREATE MATERIALIZED VIEW <name> FROM <stream> ---the <stream> must be a registered stream from the catalog

CREATE MATERIALIZED VIEW <name> AS <query> --the query must return a STREAM
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not create a MV from another MV or from a table?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The <query> can either be a stream query or a table query so what you ask is included. Does this make sense?

* (P2) a query with at least one input STREAM, returns a result STREAM
* (P2-a) we can extend this principle to (stream-table)-table, stream-(table-table) and (stream-stream)-table joins (and any other n-ary query)

The reason for this design principle is, that the "infinite" nature of an input stream dominates the "finite" nature of a table –
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't both tables and streams infinite? A table gets continuously updated. Isn't their difference the deduplication/upserts? Whether a join returns a stream or a table should be a knob: do you want to see new records in the output or do you want to see updates if the same key already exists

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't both tables and streams infinite?

Not from my point of view. The state of a TABLE is a finite set of rows. And a query is always against the table state. If you want to query the changelog, you need to use the STREAM(...) operator.

Whether a join returns a stream or a table should be a knob: do you want to see new records in the output or do you want to see updates if the same key already exists

I don't think that you can have a knob what the result type is; the result type is defined by the operator. And Stream-Stream, Stream-Table, and Table-Table joins are quite different. What I propose is as follows:

Table-Table join: behaves like an RDBMS query; the (finite) state of both tables are joined resulting is a final result (the query terminates; updates to the base tables are irrelevant; the result is not a materialized view that get's updates if the base data changes; ie, the query has "snapshot" semantics):

Stream-Table join: this is a lookup join; for each stream record a table-lookup is performed to enrich the stream record. Hence, the output is data stream.

Stream-Stream join: alyways windowed; returns an output stream

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Table-Table join: behaves like an RDBMS query; the (finite) state of both tables are joined resulting is a final result (the query terminates; updates to the base tables are irrelevant; the result is not a materialized view that get's updates if the base data changes; ie, the query has "snapshot" semantics)

Confused, classic db can do table-table join to create a MV. Are you saying this wouldn't be supported?

Copy link
Member Author

@mjsax mjsax Nov 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating a MV view for a table-table-join would be supported, but this would be a persistent query, while my comment was about transient queries.

Hence: SELECT * FROM table1 JOIN table2 ON t1.key = t2.key would be transient query (ie, pull query). It computes a result based on the table state of both input table (we need to define exacty semantics which version of the state we use) and terminates.

To create a MV, you would create a stream() on the result table and upsert it into the result MV:

// full clumsy syntax that we could simplify with implicit conversions
CREATE MV view AS SELECT * FROM STREAM(SELECT * FROM table1 JOIN table2 ON t1.key = t2.key)
// maybe (allow `STREAM(<table>)` to be a full query)
CREATE MV view AS STREAM(SELECT * FROM table1 JOIN table2 ON t1.key = t2.key)
// or even (implicit conversion from the query result table into a stream that is upserted)
CREATE MV view AS SELECT * FROM table1 JOIN table2 ON t1.key = t2.key

@mjsax
Copy link
Member Author

mjsax commented Nov 22, 2019

@vpapavas

Now, what you define as a table is a stand-alone copy of an already existing state store (or blank new state store or empty topic). If created from something already existing, was the initial state store created as part of an MV or a Table?

Not sure what you mean here. Maybe the other answers clarify it.

I.e. can you create tables only from tables or also MVs?

Yes. It basically means, that some state "snapshot" from the base table/MV is copied as initial state of the new table (we need to precisely define what "snapshot" we use).

Moreover, this new table does not receive any updates but one can explicitly do insert/update/delete . Can one create a MV from it?

Yes. The MV would be updated each time the base table is modified.

  1. What is the usage of this table you suggest? What are the extra semantics it introduces that are currently not part of Ksql's existing table? Why do we need them? I think the answer here is the knob to stop receiving updates.

This new table is use the same way one would use a RDBMS table. Currently, KSQL does not allow to modify TABLES (because they are MV) -- hence, we add support to "real" tables that users can modify. My believe is, that if we really want to build a stream-relations database (and it's still an open question how much we push ksqlDB to become one), we need to allow users to use ksqlDB in a similar way as an RDBMS, ie, ksqlDB should be a superset of RDBMS functionality.

  1. If you can create a table from another table, this already goes into MV land. Or is it just creating copies of tables?

It's creating copies. That it the exact idea of the proposal. RDMBS allow you to do the exact same thing. If we want to have clean semantics, we need to distinguish TABLES and MV -- TABLE are "stand alone entities" while MV are "derived entities".

  1. I understand you want to introduce something reminiscent to RDBMS tables. But in RDBMS, one can create materialized views from tables which you don't allow.

This should actually be allowed long term. The proposal does not elaborate on it, but it's actually includes. We have CREATE MV AS <query> -- the <query> can be against a RDBMS style TABLE do to this. As pointed out, on the read path, there is no difference between TABLES and MV.

The only difference I see between a Table and a MV is that the former can be updated only explicitly.

Correct. As stated in https://github.com/confluentinc/ksql/blob/888691db3d43e508ee1d87095288166117b501e2/design-proposals/klip-11-DQL.md#tables-vs-materialized-views

Then, the updates are propagated to other tables as it is with MVs.

Not to other TABLES (those are independent entities that might have be creates as a copy of a table/mv initially). Only if one creates a MV from a TABLE, updates are propagated.

Your last four points are exactly what I propose. Not sure what you mean by What are the semantic for each of these if all these are allowed? -- Or is this question answered by my answers from above?

@mjsax
Copy link
Member Author

mjsax commented Nov 22, 2019

@agavra About your general comment:

It's fair points you raise. There are many trade-offs to consider. As you mentioned, the design of the query language is a fundamental design decision. I also want to highlight, that to me, language design is not about defining key-words, but it's about concepts. The underlying concepts of the data and query model is of course reflected in the language design. As you can see from this proposal, I talked about TEMP TABLES etc, that are those concept I think we need.

We also need to consider the compatibility question. Breaking compatibility is getting worse the longer we wait (if we ever need to break it). This proposal avoids breaking changes, and I think it also sets us up to evolve the language in the future with breaking changes. As we know that we will add more language changed (eg, KLIP-10) we need to ask the question if those changes might introduce the danger to break compatibility (and if yes, if this proposal could avoid this issue).

Furthermore, I am not sure how some concepts proposed in this KLIP would be expressed otherwise (eg. upserting a stream into a table via TABLE(<stream>); the difference between TABLES and MV and how we want to evolve the language with regard to allowing INSERT/UPDATE/DELETE on them -- again, this is not about names, but concepts) -- counter proposals are more than welcome! And of course, for some proposed changes we can (and would) roll them out incrementally (for example, the change from CREATE TABLE to CREATE MV could be done at any time in a backward compatible way, as I don't think we want to add RDBMS style tables any time soon; similar AS OF <timestamp> clause to query table in a temporal manner).

Hence, even if we don't change anything, if this KLIP helps to shape out some design principles and fundamental concepts we want to build on, and if helps us to reason about the language design it's a win IMHO. Atm, I am missing the "big picture" and it seems important to get an idea what this "big picture" might look like. If we have a vision, we can align all individual KLIP against this vision -- atm I see the big "danger" that KLIPs are done "in isolation" and we end up with a language the cannot be evolved any longer at some point, because individual decision contradict each other.

And last, I strongly believe that this proposal makes the language intuitive and easy to use. :) I agree that we should not introduce complex concept that we might need at some point in the future without validation.

## Background: Query Properties and Design Space

Currently, KSQL supports TABLES but does not support INSERT/UPDATE/DELETE statements on those tables.
The reason for this design is the fact, that TABLES in KSQL are actually MATERIALIZED VIEWS,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so are KTables, no ? Don't know the history here, but I assumed Table terminology was actually introduced from Kafka Streams. ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds about right.


Tables and materialize views (short MV) are quite similar but still different.
An important observations is, that the difference between both is only on their _write path_, but not on their _read/query_ path.
A RDBMS style table is rather "static" (but not immutable) as it's only updated with explicit insert/update/delete statements.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you clarify what you mean by RDBMS style, since you use it in a few places? Are you talking about the features there or its schema or its storage?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's about the usage pattern. In an RDBMS, you create a table, it's empty initially, and you only modify it via explicit INSERT/UPDATE/DELETE statements. Does this clarify?

## What is in scope

The focus is to remove `EMIT CHANGES` and introduce new syntax to support different types of queries over streams and tables.
We want to exploit the stream-table duality and to transform STREAMS and TABLES into each other seamlessly.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a recurring theme. Naively speaking, we can transform one to another now today right? What exactly is KSQL unable to exploit today? (aside from easier syntax)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strictly speaking you are right. One can currently also always go via topics, eg, write a STREAM into a topic, and read it back as a TABLE or the other way round. Hence, easier syntax is one thing, but it also translate into the possibility for a more efficient execution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think that many people aren't (initially?) aware you of the trick of re-importing a TABLE as a STREAM or vise-versa. This proposal brings such functionality out of the world of tricks and into core concepts.


The focus is to remove `EMIT CHANGES` and introduce new syntax to support different types of queries over streams and tables.
We want to exploit the stream-table duality and to transform STREAMS and TABLES into each other seamlessly.
Furthermore, the query language should express the difference between TABLES and MATERIALIZED VIEW explicitly.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could allow by writing into the topic -- however, we loose read-after-write consistency what seems to be a issue.

IMO we should first have this discussion on what sort of guarantees we would want for the writes from the application. Not sure if we have thought that far ahead already.

Hence, for point (2) we need to conceptually distinguish between the operator state and the result TABLE.
Thus, my conclusion would be, if we want to allow this, we need to split the operator state and the result TABLE into two entities

I understand what you are saying.. but what exactly will break the operator? Thinking out, the difference between the result table schema and the operator state's schema? e.g if you did a select a,b,c from table t group by k having .... Result table has just 3 columns a,b,c, but operator state can have additional columns for e.g?

(it's up the the KSQL optimizer or KS runtime to make a materialization decision)
* the `TABLE()` operator consumes it's input STREAM "from beginning"
* we could extend `TABLE()` to also accept a TABLE as input if it makes the language smoother;
it's obvious that `TABLE(<table>)` would be idempotent, i.e., `TABLE(t) = t`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean an identity function?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. Identity is the right term (the identify function is of course idempotent itself...)

Comment on lines +164 to +166
* Drop the terms `pull query` and `push query`
* Use the terms `table query` and `stream query`
(this will also allow to use the phrase "query a stream" and "query a table" in a straight forward way)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I brought up a similar aspect during the EMIT CHANGES discussion to just keep one mode per entity (stream/table). I think the reasoning to push back was - we already support subscribing to a table. An extra conversion to STREAM(table) will simplify the language . +1

it's obvious that `STREAM(<stream>)` would be idempotent, i.e., `STREAM(s) = s`
* Introduce an `AS OF <timestamp>` clause for RDBMS style queries against tables
* Introduce `CREATE MATERIALIZED VIEW` statement
* Deprecate `CREATE TABLE` statement (in favor of the new `CREATE MATERIALIZED VIEW` statement) and eventually remove it
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Main thing I would like confirmation on the product roadmap is that we plan to eventually build a TABLE that will take user writes (I think this is what you mean when you say RDBMS style?). Without that, this change seems unnecessary, given Streams has KTable and not KMaterializedView . btw any plans to change that? :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, that if we never introduce RDMBS style tables, this change would not be necessary. However, because the request about adding INSERT INTO <table> is coming up, renaming it would still help to explain why it's not allowed (what would be a nice side effect -- but might not be enough to really do it).

There are not plans to change the name in Kafka Streams atm. But I also think that ksqlDB can ignore how it's called in Kafka Streams.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think moving away from CREATE TABLE to CREATE MATERIALIZED VIEW massively helps people understand why you can't (shouldn't) INSERT directly into the view. See #3954.

@PeterLindner
Copy link

Hi guys, I hope i may chime in.

As far as I understand the discussion so far, the significant difference between TABLE and MATERIALIZED VIEW is that MV is read only and being read only is desirable in a lot of situations where I would otherwise rely on my users behaving well ( eg. #3954 or #3585 ), is that about right?

I think that an aggregating stream returning a stream as @mjsax proposed in this KLIP, would lead to the same problem described in #3954 because I could also INSERT INTO the resulting stream and mess up the aggregated values. It may be worthwhile to add a MERGE() operator, which takes several streams and outputs a single merged stream (like a persistent INSERT INTO does now), to this KLIP and at the same time limit INSERTs to RDBMS style entities. This would have some benefits:

  • TABLEs, as they are now, could be a first implementation of RDBMS style tables since they aready can be created empty as well as already allow for inserts and updates (only delete / writing tombstones is missing). Therefore TABLEs don't have to be deprecated in this KLIP
  • STREAMs could also be read only (like @big-andy-coates suggests in Insert values: should not allow insert into tables KSQL does not own. #3954 (comment)) but one could still benefit from manually generating streams in ksqlDB with STREAM(myRDBMStable) for development work and if I absoluetly have to edit values in existing streams I could safely do a [CREATE STREAM modifiedStream AS] SELECT * FROM MERGE(existingStream, SELECT * FROM STREAM(myRDBMStable)) and not mess up existing topics
  • Taking a snapshot of a stream could later be an INSERT INTO myRDBMStable SELECT * FROM aStream
  • Merging streams would happen in a single query and could therefore easily be terminated (which, as far as I know, is not so easy with INSERT INTO).
  • This would also allow to provide better ordering guarantees on top of merged streams since it would be a single writer

On a sidenote: for a more sophisitcated implementation of RDBMS style tables on top of Kafka you could take a quick look at KarelDB from your fellow @rayokota. I haven't tested it myself but reacently came across his blog post and thought that could be great for ksqlDB.

@mjsax
Copy link
Member Author

mjsax commented Nov 24, 2019

Hi guys, I hope i may chime in.

Absolutely!

As far as I understand the discussion so far, the significant difference between TABLE and MATERIALIZED VIEW is that MV is read only and being read only is desirable in a lot of situations where I would otherwise rely on my users behaving well ( eg. #3954 or #3585 ), is that about right?

Sound about right to me.

In general, I agree that the current "ownership model" of ksqlDB needs improvements. As you mentioned, you can create a TABLE from an existing topic (for which case it's actually a MV and INSERT INTO should not be allowed IMHO), or let ksqlDB create the topic for you (for which case, the topic is not really locked down from my understanding and "external writes" would be possible). However, ksqlDB does not distinguish both cases atm. Also for this case, INSERT INTO <table> does not have read-after-write consistency because the write first goes into the topic (what is necessary for correctness reason) and is later read back by ksqlDB... The semantics of INSERT INTO <stream> are also questionable to me.

While I do have thoughts on the "ownership model" am I am not sure if this should be a separate discussion, even if it is somewhat related to this KLIP? If we believe that we should not decouple both discussions and extend this KLIP, I am also happy with it.

About merge(): that is an interesting idea, but it seems somewhat orthogonal to this KLIP, hence, I would like to exclude it. Thoughts?

We (at least myself) are aware of KarelDB and KCache. Robert is a colleague at Confluent. :) I did not try it out myself and I guess @rayokota should answer the question how matute this project is.

Copy link
Contributor

@big-andy-coates big-andy-coates left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @mjsax,

Thanks for putting this all down. I think it really people to think about the big picture here.

Like the table() and stream() methods. The calcite SQL paper also mentioned these, well the table one anyway. What I particularly like is that the table method is not needed anywhere were we'd want to remaining standard sql compliant, as everything that needs to be standard compliant is already a table.

I'm strongly in favour of switching from CREATE TABLE to CREATE MATERIALIZED VIEW, and fixing our INSERT VALUES to not allowing inserting into views! We can choose to keep CREATE TABLE , but make it so that PARTITIONS is required in the WITH clause, i.e. use it to create a static table. This should allow INSERT VALUES.

I actually like @PeterLindner suggestion of dropping INSERT INTO in favour of MERGE functionality. (Thanks for chiming in Peter!). I think this is easy to work.

I think it would help to summarize the proposed changes, e.g.

  1. Addition of table() operator to convert a stream to a table
  2. Addition of stream() ...
  3. Change return type of stream aggregation to stream
    etc.

It would be good to see some input from @MichaelDrogalis / @derekjn.

The good thing is that we'll be able to make changes such as these without breaking backwards compatibility with running queries once @rodesai's awesome work is complete to persist query plans.

* introduce `STREAM(<table>)` operator: takes a TABLE as input, and returns a STREAM that contains a record for each update to the TABLE
* note: the returned STREAM is a fact stream, and would be processed with corresponding STREAM semantics (it's semantically not a changelog stream)
* note: the input TABLE can either be an RDBMS style table of a MV
* by default, the output STREAM contains one record for each input table record (full table scan on the table generation when the query was issues)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear here, you're proposing that STREAM(someTable) will return a stream containing one row for each existing row in the table, followed by one row per update to the table?

How would tombstones be handled? e.g. if a row is deleted from the table it will emit a record with a null value. Currently, KSQL will handle this OKish: I think it treats a null value in a stream the same as a value where every column is null. However, if / when we have the concept of NOT NULL we may run into issues.

Thoughts?

* can we do better?
* For the ROWTIME termination criteria, it's unclear how to handle out-of-order data
* I think we can add a `WITH GRACE PERIOD OF <someTimePeriod>` to cover this case though
* How would we handle data with larger timestamp than specified ROWTIME bound; **proposal:** those record should be filtered out
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should definitely filter out as it wouldn't pass the WHERE ROWTIME <= criteria.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I applied the same reasoning.

* If a user wants to "read over" and not filter out out-of-order data, the predicate should be `OFFSET < offsetByTimestamp(endTime + gracePeriod)`
* By default, we would terminate on the first record with larger timestamp than specified
* For the ROWTIME start point criteria, how should out-of-order data be handled?
* **proposal:** out-of-order data with smaller timestamp than specified should be filtered out
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above, should be filtered out.

* It seems that `ROWTIME <= now()` semantics might not be intuitive as it mixes event-time and wall-clock time:
* I would expect that people often just want to stop at the end of the topic and confuse both concepts
* can we do better?
* Is `OFFSET <= end()` too clumsy to express what many people may want, i.e., terminate at the end of the topic?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned, offsets are per-partition. However, I think offset is the most accurate way of expressing I want to query 'from now onwards' or 'up to now', where 'now' means the current data in Kafka at this point. ROWTIME, as you mention, has issues with wall-clock vs event-time and out-of-order data. Offsets are very precise.

Maybe we can extend our syntax to support something some kind of tuple/structure so we could have something like:

WHERE (PARTITION, OFFSET) <= end()

Where end() returns offsets by partition and KSQL can handle this to ensure each partition knows where to stop, (or start).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that idea, even if the syntax is not great IMHO, as <= has no straight forward semantics for pairs. If end() returns pair (p,o) we want hat PARTITION = p AND OFFSET <= o what is not really expressed above. (Also, users might want to use < and <= and both should be supported IMHO.

Maybe we can have WHERE OFFSET <= endOfPartition() or WHERE OFFSET <= end(PARTITION) ?

A more fundamental question that I also excluded in this KLIP is, if/how we want to expose metadata like OFFSET and PARTITION to begin with. Do we consider them as part of the schema? Or should we have some functions that return the offset/partition? As offsets/partitions and immutable, they should also be handled differently than timestamps and headers IMHO -- at least for headers, we need to have a way for people to modify them (maybe even for timestamps). This might be a separate KLIP though...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we consider them as part of the schema

No. They should should be part of the schema. Having them in the schema causes problems. SELECT * FROM FOO should not include any metadata by default. If the user wants to access metadata columns then they should be doing:

SELECT *, offset() AS sourceOffset, partition() AS sourcePartition FROM FOO;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

f the syntax is not great IMHO, as <= has no straight forward semantics for pairs. If end() returns pair (p,o) we want hat PARTITION = p AND OFFSET <= o what is not really expressed above

Agreed, syntax needs work: it needs to fit our type system and its semantics. However, I'm just trying to express the concept.

... WHERE offset() < latestPartitionOffsets()[partition()];

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The argument that offset, partition, and topics are not part of the schema makes sense to me.

It's still unclear to me, if we want to allow timestamp modification, ie, setting ROWTIME explicitly? What alternatives do we have to model ROWTIME -- the current implicit column is different to a function that return the timestamp() -- why do we want/need this distinction?

Similar question about headers?



-- query table version for the given timestamp
-- note that _some_timestamp_ could conceptually also be in the future -- it's unclear if we would allow this or not and what the implications are
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implications would be the query would block until that time.

it's unclear if we would allow this

Given event time can be running far behind wall-clock time, your proposal already allows this when using now().

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I am aware. The underlying question is: should we allow/support this? Do we believe that people will understand or would it be too confusing if a query "hangs" (for unclear reasons)? The difference between a user passed in timestamp, now(), and latest() might be quite subtle to understand due to the mix of processing- vs event-time.

If we consider it as too complex, what alternatives do we have? (I would love a simpler way, but could not come up with anything... :( )

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we have to expose this. Otherwise we restrict what (knowledgeable) users can express.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine with that (in the end it is what I proposed in the KLIP). Just want to make sure we are aware of the implications and that we need to carefully educate user about it.

Comment on lines +699 to +704
To get the desired result, the user must specify that the outer query should be answers only after the input stream is processed "completely"
(with the definition of "completely" as up the end-offset when the query was started). Hence, the use needs to add an explicit AS OF latest() clause:

```sql
SELECT * FROM TABLE(SELECT count(*) FROM stream) AS OF latest()
```
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure this example SQL works.

I think it would still return an empty table, because latest() would be applied to the output of the inner select, which would be offset zero.

Breaking it down:

  • SELECT count(*) FROM stream: returns a stream that will read the input stream from the beginning and build an aggregate, outputting each change, but as the query has only just started, it won't have processed any source messages, so the output stream will be empty.
  • TABLE(<stream>) AS OF latest(): converts the returned stream to a table AS OF the streams latest offsets, which will be zero as the stream is empty.

Or am I missing something?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure this example SQL works.

Me neither. Compare below:

It's a little fuzzy, how the AS OF latest() clause can be translated and executed cleanly though – it's more like a high level idea atm.

Hence, what you say makes total sense to me. We would need to define some translation rules to execute the query like:

TEMP TABLE foo = TABLE(SELECT count(*) FROM stream WHERE offset <= end())
SELECT * FROM foo;

The first query would only terminate after the inner stream query terminates. Hence, foo is "fully loaded" to version "latest" and thus we can just do a simple select on foo. Ie, we somehow translate AS OF latest() into WHERE offset <= end() of the inner query.

Of we need to find a totally different way to express it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So here you're using Table(<stream>) to get a rdbs style static table, right? Where as Table can also be used to create a MV too, right?

This is muddy / confusing. Is this just a badly worded example, or is there either some confusion over what TABLE() returns, or is it context sensitive?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from the Public APIs section of KLIP-11:

TABLE() operator: takes a STREAM as input, and changes its semantic interpretation from facts to updates; it returns a TABLE

note: it's not specified if the returned TABLE is materialized into a state store or represented as a changelog (it's up the the KSQL optimizer or KS runtime to make a materialization decision)

and from the Tables vs. Materialized Views section

Tables and materialized views (short MV) are quite similar but still different. An important observations is, that the difference between both is only on their write path, but not on their read/query path.

As I understand it, TABLE(<stream>) is some intermediate thing that changes the semantics of the stream from facts to a changelog and can be read like a table but can't stand on its own. According to the KLIP the write path makes the difference. So you could perform three operations on this intermediate TABLE(<stream>) thing:

  • materialized push query with CREATE MATERIALIZED VIEW AS SELECT * FROM TABLE(<stream>) (could be cast implicitly)
  • materialized pull query with CREATE TABLE AS SELECT * FROM TABLE(<stream>) (could be cast implicitly)
  • transient pull query with SELECT * FROM TABLE(<stream>)
  • transient push query does not make sense on a table (one would query the changelog)

However a shorthand syntax was proposed, so that TABLE(<stream>) on its own would actually be interpreted as a transient pull query (ie omiting the SELECT * FROM). I guess that's where the misunderstanding comes from.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So here you're using Table() to get a rdbs style static table, right?

No. table(<stream>) would always return a MV -- the table() operator consumes the stream and writes into the MV (note, that it is a TEMP object as it does not have a name).

So maybe it should have been TEMP MV foo = ...

However, there is an outer pull/table query, SELECT * FROM <tableOrMV> AS OF latest() and this query is always against state (there is no difference between a table or a MV for this case).

Does this make sense?


## What is in scope

The focus is to remove `EMIT CHANGES` and introduce new syntax to support different types of queries over streams and tables.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The focus is to remove EMIT CHANGES

Humm...

The point of adding the EMIT syntax was to control materialization of the result. Initially we introduced EMIT CHANGES, but conceptually this contrasted EMIT FINAL, even if it wasn't in the syntax.

KLIP-10 is proposing to introduce EMIT FINAL. How does this KLIP interplay with KLIP-10?

Is this how you see it working:

EMIT syntax Proposed Syntax
get value(s) from table SELECT * FROM MY_TABLE [EMIT FINAL] SELECT * FROM MY_TABLE
get stream of changes from a table SELECT * FROM MY_TABLE EMIT CHANGES STREAM(SELECT * FROM MY_TABLE )
windowed persistent query that outputs intermediate results per window SELECT COUNT() FROM MY_STREAM WINDOW WINDOW TUMBLING (SIZE 30 SECONDS) EMIT CHANGES SELECT COUNT() FROM MY_STREAM WINDOW WINDOW TUMBLING (SIZE 30 SECONDS)
windowed persistent query that outputs only final result per window SELECT COUNT() FROM MY_STREAM WINDOW WINDOW TUMBLING (SIZE 30 SECONDS) EMIT FINAL TABLE(SELECT COUNT() FROM MY_STREAM WINDOW WINDOW TUMBLING (SIZE 30 SECONDS))

How about when if/when we introduce the extensions to the EMIT syntax, as covered by the klip, e.g. rate limiting?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is an excellent point and my thoughts on it are not final. However, if you consider the stream() operator as different syntax to emit changes (not 100% if this holds up exactly), we could extend the stream() operator to embed all those "rate control" mechanism. Basically, both stream() and emit changes are some kind of CDC operation on the table.

There is also a difference between queries against streams and tables IMHO, and IMHO emit changes only applies to push/stream queries. The emit changes (and emit final) keyword does not really apply to pull/table queries IMHO and thus should not be supported/allowed for this case (it would also not be compliant with ANSI SQL that we aim to support from my understanding).

I am also wondering, if EMIT FINAL is actually a property of the query, a property of the CDC operator, or the property of the window-aggregation-operator itself. To me, it seems that CDC rate control and emit final are two different things and that final is a property of the window-aggregation-operator itself. If this holds up, using EMIT CHANGES and EMIT FINAL syntax would convolute two concepts that should be separated. I want to point out, that I am not 100% sure about this point---just wanted to share my current understanding.

The first three examples for "proposed syntax" sounds about right. For the last one, the result table would still be updated with every update because the "inner query" return the full changelog stream. Also, the statement would only make sense as persistent CTAS query. A transient query would return an empty table as it returns the current table state when the query is issued and this table state would be empty when the query is issued. (compare section https://github.com/confluentinc/ksql/blob/888691db3d43e508ee1d87095288166117b501e2/design-proposals/klip-11-DQL.md#advanced-query-patterns that discussed this point in more details)

Hence, if the final property is a property of the window-aggregation-operator, it should not be part of the stream() operator (or EMIT clause). And for the last example, we would need to figure out how we change, eg, WINDOW TUMBLING (SIZE 10 SECONDS) syntax to allow specifying the final property.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copied from KLIP-10 comment for context #3754 (comment)

@mjsax

From the discussion in KLIP-8, the proposition was that for a given query, e.g.

SELECT * FROM FOO WHERE ROWKEY=x;

(Where FOO is a table thats computing an aggregate, e.g. CREATE TABLE FOO AS SELECT COUNT() FROM BAR GROUP BY ROWKEY;)

You can either request the changes of how the final result was built, using EMIT CHANGES, or only want the final result, using EMIT FINAL, with EMIT FINAL being the default if none explicitly supplied.

EMIT CHANGES would output the stream of CDC events of the history of how the count, for this key, changed over time. (Obviously, key compaction will delete history, but that's configurable).

EMIT FINAL would get the current final result for the key. (Either from a state store, or potentially from reading the source topic and computing the aggregate. Using the state store is just an optimisation).

So in this sense both EMIT CHANGES and EMIT FINAL are valid.

Of course, that's not taking into account KLIP-11, which proposing using SELECT * FROM STREAM(SELECT * FROM FOO WHERE ROWKEY=x);.

However, this klip currently stands alone, or rather sits on top of klip-8. As such the EMIT FINAL is actually implicit for pull queries. Of course, we can decide if that should continue to be the case, but I think that is better discussed in klip-11.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Escentially, what I think we're saying is that:

  • STREAM() would be the same as EMIT CHANGES, (though current proposal is that stream() does not expose history CDC events by default, but users could get at it using STREAM(SELECT * FROM TABLE AS OF 0)), .
  • TABLE() would be the same as EMIT FINAL

I am also wondering, if EMIT FINAL is actually a property of the query, a property of the CDC operator, or the property of the window-aggregation-operator itself.

I don't think it's a feature of the window aggregation operator. The window-aggregation-operator could be expressed as a table function, e.g.

SELECT * 
FROM hoppingWindow(
    myStream,
    myStream.eventTime,
    SIZE 30 SECONDS
)

which then outputs a table, (1 or more rows), per input event that reflected all the windows the input row falls into. (The schema of the output would be the schema of the stream, plus columns for window bounds). However, to me, doing so demonstrates that the materialization control EMIT is providing is not part of the window-agg-op, but of the outer query itself, i.e. the query as an input that is the output of the table function, and the result materialization is performed on the query output, not the table function output.

Comment on lines +363 to +374
**Example 400: subscribing to a changelog stream (transient continuous query of an input table)**

```sql
[CREATE STREAM myChangelog AS] SELECT * FROM STREAM(tableOrMaterializedView)
-- returns a STREAM
-- compare Example 100 (those properties apply)
-- without CSAS prefix, it is the transient query counterpart to Example 300 – all updates are "streamed" continuously to the client
-- (note, that the result is a fact-stream and it's the client responsibility to "interpret" the records with update semantics)
-- The STREAM(...) operator creates a changelog stream from its input TABLE (can be a RDBMS style table or a MV); compare Example 0
-- The output stream contains one record for each record in the table (ie, full table scan) based on the table generation when the query was issued
-- plus a record for all future updates
```
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming you can do something like STREAM(SELECT * FROM TABLE AS OF 0) to get the full (compacted) change log of a table + updates?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The full syntax would be

SELECT * FROM STREAM(SELECT * FROM TABLE AS OF 0)

It's an open question that I raised on the KLIP already, if STREAM(...) would be a valid query by itself from a grammar point of view? If yes, than STREAM(SELECT * FROM TABLE AS OF 0) would be a short version of the full syntax -- I don't have a strong opinion on this question about it atm (also not sure if this is a fundamental design question or a question about syntactic sugar?)

Comment on lines +96 to +98
* updating a TABLE from version v to v+1 might imply multiple updates and might take some time,
hence, at any point in wall-clock time a TABLE might be in an inconsistent state
(we call those the _generation_ of a table, i.e., for each table version, there might be multiple generation and only the last generation is consistent).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I full follow what these generations are. Can you add an example / elaborate?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's say you have the following input changelog topic that you want to materialize as table state:

<key,value,timestamp>
<A,1,100> <B,2,100>, <A,5,200>, <B,6,200>

There are two table versions:

t=100    t=200
A = 1    A = 5
B = 2    B = 6

But each versions has two genrations (ie, each applied input record, advance the generation)

t=100    t=100     t=200     t=200
g=0      g=1       g=2       g=3
A = 1    A = 1     A = 5     A = 5
         B = 2     B = 2     B = 6

g=0 represent an incomplete state of version=100 because not all updates for version =100 are applied yet. Similarly, g=2 represents an incomplete state of version=200 for the same reason.

Using the term "version" vs "generation" makes it simple to talk about semantics, because it highlights the difference between processing time and event-time. For example, we can say that a pull query is always executed at the latest version and generation when the query is submitted. Or if we introduce AS OF <timestamp> keyword, we would give a guarantee about the table version that is queried (but not necessarily which generation within the version is queried etc). We can also define "current version" as the materialized version when a query is executed (in system time) vs "latest version" as whatever if available in the topic (assuming the table materialization lags).

Does this make sense?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, thanks. IMHO this is worth including in the klip

And of course we need some concept of a grace period if we're to have the concept of a 'final generation' / 'state version', as otherwise out-of-order data may always be possible in the future, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. I'll extend the KLIP to add those details.

About the second point: it depends what guarantees/semantics we want to support. If we say (what I would propose atm) AS OF <timestamp> give you latest generation of the specified version (ie, timestamp), we don't really need it because we don't give any guarantee that the version is final.

@PeterLindner
Copy link

About merge(): that is an interesting idea, but it seems somewhat orthogonal to this KLIP, hence, I would like to exclude it. Thoughts?

Since you introduce the concept of operators in this KLIP I thought it wouldn't make sense to propose a MERGE()-operator without the context of this KLIP, but feel free to move it elsewhere.

We can choose to keep CREATE TABLE , but make it so that PARTITIONS is required in the WITH clause, i.e. use it to create a static table. This should allow INSERT VALUES.

That's exactly what I meant with "first implementation of RDBMS style table". It lacks read-after-write guarantees but I don't know if users expect that from a ksqlDB table. As of now, ksqlDB doesn't support it anyways.
If I understand the KCache code correctly, it waits until the internal Consumer has read up to the offset until it acks the write to provide read-after-write guarantees. Maybe you can integrate something similar into INSERT VALUES 😉

Btw. I also think that INSERT VALUES into streams is quite dangerous, but the STREAM(<table>)-operator from this KLIP would be an almost equivalent (but safer) alternative.

I'm strongly in favour of switching from CREATE TABLE to CREATE MATERIALIZED VIEW

As an upgrade path you could first deprecate CTAS in favor of CMVAS (but keep CT as stated above) and then reintroduce CTAS with the semantics of this KLIP later. Seems like a rather smooth transition instead of a hard change to me.

@mjsax
Copy link
Member Author

mjsax commented Nov 27, 2019

About INSERT INTO vs merge() -- another line of thinking would be to use more SQL native UNION keyword (a UNION DISTINCT would not be allowed for streams though). Just another idea.

@big-andy-coates
Copy link
Contributor

big-andy-coates commented Nov 29, 2019

Thinking more on the table vs mv discussion, I think @mjsax and @PeterLindner are close to being in alignment with the following, but writing it out to clarify my thoughts and get it down in.

Assumption:

  • A stream is a stream of facts. Each fact stands alone. Hence it is possible to have a stream with many people writing to it.
  • A table has per-key update semantics. It is therefore crucial that there is control over how a table is updated. In terms of KSQL and changelog topics, this means only a single writer per partition. Allowing arbitrary writing to a table leads to undesirable/undefined behavior, e.g. if two writers updating a row, the later writer will overwrite the first writers update.
  • A materialized view is like a table, except its data is computed from upstream source(s). Updates to the view are not allowed. If you want to update the view, you need to update the source(s).

Proposal:

  • Split current CREATE TABLE statement in two:
    • CREATE MATERIALIZED VIEW: imports an existing topic containing a changelog. It is assumed the changelog is updated by some upstream entity. KSQL does not own the topic, will not allow writes to it and will not delete it.
    • CREATE TABLE: creates a new topic or explicitly takes ownership of the topic containing a changelog into KSQL. It is any assumed any pre-existing topic is not updated by an another system. If it is updated by another system the behaviour is undefined. KSQL owns the topic. writes are allowed and KSQL willl automatically delete the topic if/when the table is dropped. This is essentially a 'static' rdbs style table. We should not allow persistent queries to write to it.
  • Split CREATE TABLE AS SELECT statement in two:
    • CREATE MATERIALIZED VIEW AS SELECT: creates a new changelog topic, failing if the topic already exists, (current behaviour). KSQL owns the topic, but as its a view updates to it are not allowed. KSQL will delete the topic if the view is dropped.
    • CREATE TABLE AS SELECT: future work: not proposed for now. Would create a static rdbs style table, which KSQL owns. Updates allowed. KSQL will delete on drop.

IMHO this is clean and intuitive. Views do not support INSERT/UPDATE/DELETE, tables do. Users can be explicit about how they want KSQL to manage the changelog topic.

Switching to MATERIALIZED VIEWs also sets us up in the future to support non-materialized VIEWS, which would allow people to define a set of views and then use them to build a final materialized output, i.e. it would allow users to avoid materializing intermediate steps like they have to today.

Extending this to streams:

  • Spit CREATE STREAM in two:
    • CREATE STREAM VIEW: like CREATE MV, imports a topic containing a stream into KSQL. KSQL does not own the topic. updates are not allowed. KSQL will not delete the topic.
    • CREATE STREAM: like CREATE TABLE: creates a new topic, or takes ownership of existing. KSQL owns topic and will delete it on a drop. Inserts only will be allowed. As streams don't need a single writer, inserts by other systems are supported.
  • Split CREATE STREAM AS SELECT in two:
  • CREATE STREAM VIEW AS SELECT: creates a read-only stream derived from some source(s). KSQL owns the topic. Inserts not allowed. KSQL will delete the topic on drop.
  • CREATE STREAM AS SELECT: creates a stream, (and topic), from a one-time query. Owned by KSQL. Inserts allowed. Will be deleted on drop.

I'm going to copy this into the issue that is tracking this specific bit of work as well: #3773

cc: @derekjn, @MichaelDrogalis & @agavra for comment

@vpapavas
Copy link
Member

vpapavas commented Dec 2, 2019

I am confused about the stream() and table() operators. It seems that they are replacing emit changes and pull queries respectively. Is this true? Hence, they only control the output of a query?

Regarding the JOIN, I believe that what it does and what the output is should not depend on its input parameters. Because if they do, then we are uberloading its definition. Unless, you are suggesting we have different join operators, the stream-join, the table-join, the mixed-join, etc. Which I don't think makes sense at the syntax (logical) level. Especially since we have operators that control the output (emit changes, or your stream/table).

Let's say we have only one JOIN operator which does what we all know it does. Then, using the existing language, the semantics of the queries below are clear and I don't need to think what the input is to understand what the output will be.

1a. SELECT * from stream1 JOIN stream2 : Join two inputs with whatever is there at the moment the query is issued and return the result and terminate. (Pull query)

2a. SELECT * from stream1 JOIN stream2 emit changes : Join two inputs with whatever is there, don't terminate but keep updating the result as new data arrives. (Push query)

3a. SELECT * from table1 JOIN table2: The same as with example 1.

4a. SELECT * from table1 JOIN table2 emit changes: The same as with example 2.

5a. SELECT * from table1 JOIN stream2: The same as with example 1.

6a. SELECT * from table1 JOIN stream2 emit changes: The same as with example 2.

This is very simply and clear to me. The behavior of the output is dictated by the operator EMIT CHANGES which has well-defined semantics. The JOIN does not control the output and it shouldn't.

How do I achieve the above using the new syntax?

1b. TABLE(SELECT * from stream1 JOIN stream2) : Does this do what example 1a above does?

2b. STREAM(SELECT * from stream1 JOIN stream2) : This corresponds to example 2a?

3b. TABLE(SELECT * from table1 JOIN table2): Example 3a?

4b. STREAM(SELECT * from table1 JOIN table2): Example 4a?

5b. TABLE(SELECT * from table1 JOIN stream2): Example 5a?

6b. STREAM(SELECT * from table1 JOIN stream2): Example 6a?

If this is true, then we agree that the JOIN is one operator and what it does, does not depend on the input. Also we agree that EMIT CHANGES = STREAM and `` = TABLE and they only control the output of a query.

@mjsax
Copy link
Member Author

mjsax commented Dec 3, 2019

I am confused about the stream() and table() operators. It seems that they are replacing emit changes and pull queries respectively. Is this true?

Not sure if I would phrase it that way. The overall proposal changes the underlying "base model" conceptually and thus the comparison seems not to be 100% accurate. Following this KLIP, if you query a table, it's always a pull query. If you query a stream, it's always a push query. It's a different way to approach the problem.

If you want to get a "push query over a table" (something that does no exist in this mode), you need to get the table changelog stream and query the stream (to get a push query over a stream). And querying a "stream in a pull fashion" is a somewhat fuzzy term anyway IMHO: if it means a "range of time" (from beginning-to-now) over the stream, you would upsert the result stream into a table, and than query the table state following this proposal (again, you can only "pull" from a table).

Does this make sense?

About joins, I don't see a big issue with overloading the semantics. Overloading is a well established and well understood concept. About your examples: How would you compute example (2a)? Without a time window definition for a stream-stream join we need to materialized both streams into RocksDB, and because of EMIT CHANGES we can never purge any state and thus the query would "blow up" eventually.

Not all of your examples about how the new syntax would work are not correct though:

  1. SELECT * FROM stream1 JOIN stream2 WITHIN <timeWindow> WHERE stream1.OFFSET < end() OR stream2.OFFSEt < end()

Added a required time-window definition for the stream-stream join. Also, you just want the query to terminate, but the result is still a STREAM, not a TABLE.

  1. SELECT * FROM stream1 JOIN stream2 WITHIN <timeWindow>

The output is a STREAM and thus, there is no reason do use the stream(...) operator.

  1. SELECT * from table1 JOIN table2

The output is a TABLE and thus, there is no reason to use the table(...) operator.

  1. correct; we must get the changlog stream of the result table of the inner query

  2. SELECT * from table1 JOIN stream2 WHERE stream2.OFFSET < end()

You just want the query to terminate, but the result is still a STREAM, not a TABLE.

  1. SELECT * from table1 JOIN stream2

The result is STREAM already and there is no reason to use the stream(...) operator.

Does this shed some light about what this KLIP proposes?

@big-andy-coates
Copy link
Contributor

Yep, makes sense Peter. It's been a while since I read the whole KLIP ;)

@mjsax
Copy link
Member Author

mjsax commented Feb 14, 2020

Thanks for your replies @PeterLindner and @big-andy-coates!

As stated originally, I see pros/cons for different approaches and your arguments to favor suggestion "(a): (WHERE clause only for filtering but not to terminate a query)" are convincing to me. My main concern was the question on how to specify a potential termination criteria, but @big-andy-coates suggestion to use EMIT WITH GRACE PERIOD OF 1 MINUTE look quite appealing to me. If we don't think that it will be too confusing for users that a regular time-range query does not terminate by default but waits for out-of-order data indefinitely, we could follow that route.

Would love to get some feedback from @vpapavas @agavra @vinothchandar @miguno ?

If we agree on a general direction based on the current discussion, we should do a spin-off KLIP similar to the "materialized view" KLIP to keep the discussion focused and to push something over the finish line. If anybody wants to do this spin-off KLIP, feel free to do it :)

@miguno
Copy link
Contributor

miguno commented Feb 17, 2020

Given the nature of a stream, there are the following design choices:

a) a query over a stream always considers past and future data and thus it does not terminate by default
b) a query over a stream always considers only future data and thus it does not terminate by default
c) a query over a stream only considers past data and it does terminate by default (ie, does not consider future data)
d) a query over a stream always considers only future data and it does terminate by default (not really an option, because it would result in an empty result by default)

Edited (fixed the typo where I wrote that I prefer (a), but I actually meant (c)): Actually, I have a different opinion as others here because I'd favor (c) as the default behavior for:

SELECT * FROM myStream

because this behavior is consistent with a SELECT * FROM myTable (a pull query).

For the other parts below, I agree however.

WHERE clause to control termination:

SELECT * FROM FOO 
  WHERE 1534756000 <= ROWTIME AND ROWTIME <= 1534757000 
  EMIT WITH GRACE PERIOD OF 1 MINUTE;

I do like @big-andy-coates's suggestion here, which is a combo of a WHERE clause on ROWTIME and the addition of a GRACE PERIOD. (The syntax for the latter could be changed perhaps, but the building blocks I like.)

For what it's worth, here's an enhanced example that includes EMIT CHANGES explicitly:

SELECT * FROM FOO 
  WHERE 1534756000 <= ROWTIME AND ROWTIME <= 1534757000 
  EMIT CHANGES WITH GRACE PERIOD OF 1 MINUTE;

Questions:

  • Is this verbose example/syntax that what you'd have in mind? (Though EMIT CHANGES would always be implicit for a stream, right?)
  • I suppose OF 1 MINUTE always refers to event time, not wallclock time, right? (and ROWTIME = event time)

I just want to double-check that whatever syntax we propose here would work with the existing EMIT CHANGES statement.

Choice between (a) or (b):

a) a query over a stream always considers past and future data and thus it does not terminate by default
b) a query over a stream always considers only future data and thus it does not terminate by default

From a purist point of view a query against a stream with no WHERE clause should return all the available data, i.e. it should return historic data.

Like @big-andy-coates, I too favor (a) if we can make it work UX wise. That is, if we can make it easy and simple for the user to express a query for (b).

(b) has the naive syntax of:

SELECT * FROM myStream WHERE ROWTIME > now();

but with the caveats that Andy mentioned, thus:

Maybe we could have WHERE ROWTIME > streamNow(), where streamNow() resolves to the event time of the latest message? In the presence of multiple partitions it resolves to the highest timestamp, or maybe the lowest, or maybe we offer both.

Yeah, that's how I understood how the timestamp->partition-offsets mapping would be used in this context. I don't recall the various candidate naming terms in this KLIP for the "now"/time functions (maybe streamNow() == latest() as @PeterLindner suggested), but the main point is to let the user pick a built-in function that fixes "now as of stream time" at the start of the aforementioned SELECT query.

I would have suggested STREAM_NOW() (which alludes to the wall-clock time function NOW() in the SQL standard), but naming-wise it's not really about the stream, it's about event time. That is, the same function should also be available and make sense conceptually when you use it on a SELECT query against a table.

Unfortunately, the following doesn't roll off the tongue very easily:

SELECT * FROM myStream WHERE ROWTIME > EVENT_TIME_NOW()

Then again, maybe STREAM_NOW() would be acceptable. But I simply don't like that the data structure (STREAM) is used to qualify "now", because the time handling (event time) is different from the data structure.

@big-andy-coates
Copy link
Contributor

big-andy-coates commented Feb 18, 2020

@miguno

Actually, I have a different opinion as others here because I'd favor (a) as the default behavior for:

SELECT * FROM myStream

I think both Peter and also went for (a) - so I'm not sure you have a different opinion!

because this behavior is consistent with a SELECT * FROM myTable (a pull query).

Not sure (a) is consistent with a table!

Questions:

  • Is this verbose example/syntax that what you'd have in mind? (Though EMIT CHANGES would always be implicit for a stream, right?)

Syntax is up for debate. Mine was just and example. I didn't use EMIT CHANGES as I think one of the points of this KLIP was to remove the EMIT CHANGES syntax.

  • I suppose OF 1 MINUTE always refers to event time, not wallclock time, right? (and ROWTIME = event time)

Yep, it would be stream time, which is event time based, i.e. once the stream time has advanced 1 minute beyond the upper bound set on rowtime that partition would stop processing new records. Once all partitions have stopped, the query is stopped.

I just want to double-check that whatever syntax we propose here would work with the existing EMIT CHANGES statement.

As above: @mjsax is proposing we remove EMIT CHANGES as a thing.

I would have suggested STREAM_NOW() (which alludes to the wall-clock time function NOW() in the SQL standard), but naming-wise it's not really about the stream, it's about event time. That is, the same function should also be available and make sense conceptually when you use it on a SELECT query against a table.

So 'stream time' is a Kafka-streams concept. It's tracked per-partition and, if memory serves me right, tracks the highest timestamp the partition has seen. Hence it only moves forward. You're right about stream-time being measured along the event-timeline, not wall-clock-timeline.

Maybe event_time_now would be a better name. Especially as we'd not actually be calculating stream-time, (as such a calculation would require consuming all the data in the partition). Maybe short hand et_now or et_latest?

Unfortunately, the following doesn't roll off the tongue very easily:

SELECT * FROM myStream WHERE ROWTIME > EVENT_TIME_NOW()
SELECT * FROM myStream WHERE ROWTIME > ET_NOW();

???

@mjsax
Copy link
Member Author

mjsax commented Feb 21, 2020

Thanks for the input @miguno. I am a little confused by your comment though. Statement (a) implies that a query does not terminate by default (what is preferred by @PeterLindner @big-andy-coates and myself), ie, it does not work like a pull query, but it's a push/continuous query. Hence, I am wondering if you mean (c)? On the other hand, later you write that for a (b)-type query the following syntax could be used:

SELECT * FROM myStream WHERE ROWTIME > now();

This actually align with (a). If you really prefer (c), additional syntax would be required to tell the query not to terminate. In the (c) model, the query from above would always return an empty result as all old data is filtered out and no new/future data would be considered. Or did you just omit the EMIT CHANGE clause (that this KLIP actually suggests to remove) and you did mean:

SELECT * FROM myStream WHERE ROWTIME > now() EMIT CHANGES;

For this case, the EMIT CHANGE clause would be use to tell the query not to terminate. I am wondering if this syntax would allow to specify a query that terminates in the future? Also one more question: what would this query return:

SELECT * FROM myStream WHERE ROWTIME < now() WITH GRACE PERIOD 1 HOUR;

As there is no future data considered (there is no EMIT CHANGE clause) would the grace period actually be considered?

@big-andy-coates One follow up question about the grace period: what would happen if a query does not have an upper bound WHERE clause on ROWTIME (ie, no ROWTIME < x) and a user specified a grace period? Would this query be valid (and just ignore the grace period) or would we throw an error?

@miguno
Copy link
Contributor

miguno commented Feb 26, 2020

@big-andy-coates :

I think both Peter and also went for (a) - so I'm not sure you have a different opinion!

Doh, my bad. This was a typo. I meant (c), not (a):

c) a query over a stream only considers past data and it does terminate by default (ie, does not consider future data)

That is, SELECT * FROM myStream should terminate by default.

because this behavior is consistent with a SELECT * FROM myTable (a pull query).

@big-andy-coates wrote: Not sure (a) is consistent with a table!

You are right, (a) is inconsistent with a table. But (c) is not. :-)

@miguno
Copy link
Contributor

miguno commented Feb 26, 2020

@mjsax: Regarding EMIT CHANGES and its use/purpose in the syntax.

I see EMIT CHANGES as a means to define the output behavior of the query (e.g., do I want a single, final update vs. 'intermediate' updates), i.e., what should the materialization behavior be for that query. This behavior is orthogonal to the query's behavior of 'terminate' vs. 'run continuously'.

(FWIW, when EMIT CHANGES was originally introduced to ksqlDB's syntax, I was already raising my concerns about conflating these two different query characteristics in EMIT CHANGES.)

@big-andy-coates wrote:

As above: @mjsax is proposing we remove EMIT CHANGES as a thing.

Perhaps the problem with EMIT CHANGES is its current use in the ksqlDB grammar, which is conflated. But I don't understand why we should remove it altogether. If we were to remove EMIT CHANGES, how would users be able to define the output behavior of a query, e.g., 'single, final update' vs. 'all intermediate updates including the final update'?

@purplefox
Copy link
Contributor

Sorry for the late reply to this… and apologies in advance for opening this can of worms again!

It strikes me our current mental model and also the one proposed here are quite complex. I think we can create a simpler model while retaining the expressiveness we need (i.e. it should support all the use cases we care about).

My preference is for a “tables only” model, well actually a “materialized views” only model, but the key point being there is no first class abstraction of “stream”. What we currently call streams and tables are both just materialized views in this model.

This model has several advantages including:

  • Dramatically simpler mental model which will be very familiar to anyone with RDMS experience
  • Standard SQL, no confusing extensions like “emit changes” required

I also think that adopting views can lead to a simpler implementation which has several further advantages including

  • Materialized views don’t need any Kafka topics to back them. Queries can be run from memory and state persisted to local storage only e.g. using RocksDB for durability.
  • Much simpler migrations as materialized views not backed by topics. You can use a more standard RDBMS style migration.
  • Pull queries just select from the view as you would expect in any RDBMS.
  • Push queries are just materialized views that are created on the fly and dropped when not needed any more.

@PeterLindner
Copy link

Termination of the query should be determined by the presence of the upper bound on the pseudo-monotonically increasing ROWTIME column and some definition of a grace period, e.g.

SELECT * FROM FOO 
  WHERE 1534756000 <= ROWTIME AND ROWTIME <= 1534757000 
  EMIT WITH GRACE PERIOD OF 1 MINUTE;

@big-andy-coates would it make sense to define grace period with a function? like:

SELECT * FROM WITH_GRACE(FOO, 1 MINUTE)
   WHERE 1534756000 <= ROWTIME AND ROWTIME <= 1534757000

the benefits would be:

CREATE TABLE T as SELECT
   stuff 
FROM TUMBLING(
   WITH_GRACE(
      S,
      1 MINUTE
   ),
   1 SECOND
) 
GROUP BY something, windowstart, windowend;

the downside would be that the syntax is not so fluent, but maybe you guys can come up with a better one.
TBH I haven't thought that through fully and don't know what exactly this function would return, but I hope I could inspire you.


My preference is for a “tables only” model, well actually a “materialized views” only model, but the key point being there is no first class abstraction of “stream”. What we currently call streams and tables are both just materialized views in this model. [...] Materialized views don’t need any Kafka topics to back them. Queries can be run from memory and state persisted to local storage only e.g. using RocksDB for durability

@purplefox I have also thought about whether a stream abstraction is needed or not, didn't come to a conclusion though. I'm neither a Kafka nor a ksqlDB expert, but these questions come to my mind when I think about it:

  1. how would you deal with situations where it is either not needed or infeasible to materialize state?
  2. how would you reprocess data when only current state is stored?
  3. how would you mutate state? can you insert values into materialized views? this klip actually exists because this has consistency problems with aggregates
  4. how would you stream data back to other applications via Kafka (not using the ksqlDB client to reduce coupling) if there are no backing topics?
  5. would schema evolution guarantees controlled by schema registry still apply?
  6. what qualifies ksqlDB as a streaming database if it has no stream abstraction and why would you choose it over any other well established RDBMS together with Kafka Connect then?

@purplefox
Copy link
Contributor

purplefox commented Mar 7, 2020

Hi Peter

@purplefox I have also thought about whether a stream abstraction is needed or not, didn't come to a conclusion though. I'm neither a Kafka nor a ksqlDB expert, but these questions come to my mind when I think about it:

  1. how would you deal with situations where it is either not needed or infeasible to materialize state?

Can you elaborate on such a situation?

If you're referring to a stateless streaming operation, e.g. a stateless filter then that can also be implemented as a materialized view, e.g.

create materialized view uk_employees as
select * from employees where country='UK'

then you would you send the results to a Kafka sink topic or have them consumed by another materialized view

In the case of a non aggregate materialized view it's not very useful to keep a lot of rows in the view as you're just passing them on, so you could limit the number of rows retained in config to a small number, any older rows are automatically dropped.

  1. how would you reprocess data when only current state is stored?

Perhaps you could explain this use case in more detail?

  1. how would you mutate state? can you insert values into materialized views? this klip actually exists because this has consistency problems with aggregates

To avoid consistency issues related to manually inserting/updating materialized views we could have two different "table" types - one a straight table which you can insert/update/delete like any RDBMS table and materialized views. Internally they would be implemented the same way.

  1. how would you stream data back to other applications via Kafka (not using the ksqlDB client to reduce coupling) if there are no backing topics?

You would create sinks - a sink is a Kafka topic that receives all the updates from a materialized view.

So basically you have source Kafka topics which feed the materialized views. Materialized views can select from sources or from other materialized views.

Then you have sinks which take changes from materialized views and spit them into Kafka topics.

But there are no other topics used - no repartition topics and no backing topics for tables. The tables themselves are the source of truth. In a clustered setup the tables are sharded, when selecting for an aggregate we can basically reshard the rows and send them to the owner nodes in the cluster so the can be inserted - this can be done directly between DB nodes so we don't need repartition topics.

Not having backing topics or repartition topics will also make table migrations easier. We can just do a familiar RDBMS style "amend table" type operation without worrying about data that might live in backing or repartion topics and trying to figure out how to get that to work with updated queries (which is a massive pain that we are trying to find solutions to in ksqlDB currently).

  1. would schema evolution guarantees controlled by schema registry still apply?

It wouldn't use Kafka topics as source of truth so I'm not sure it would care... or it would be relevant, but perhaps I haven't understood the question properly.

  1. what qualifies ksqlDB as a streaming database if it has no stream abstraction and why would you choose it over any other well established RDBMS together with Kafka Connect then?

To me the difference between a traditional RDBMS and a streaming database is that a traditional RDBMS only supports point in time queries (i.e. give me the results of this query right now), whereas a streaming database also supports continuous queries (also called push queries) where you get the current state but the query remains "open" and you also receive any updates to the query results until you close the query. To me a streaming database is about the types of queries it supports and is nothing to do with having a first class concept of "stream" like table (that's both unnecessary and confusing imho).

@mjsax
Copy link
Member Author

mjsax commented Mar 7, 2020

@purplefox @PeterLindner Your discussion seems to go a little sideways. The idea of a "table only" model is not proposed in this KLIP -- if you want to suggest something like this, feel free to do a separate proposal (I have no intend to consider a table only model in this KLIP). The discussion about "source of truth" (ie, local disk within ksql-server vs changelog topics) is an non-semantic implementation discussion and not related at all (neither to this KLIP not to a table-only mode). Because this KLIP is already complex by itself, and I would propose to avoid getting side-tracked.

@miguno Thanks for clarifying that you prefer proposal (c). I agree that query termination/non-termination and output behavior are two different things. This KLIP does not talk about output behavior at all, but we only discuss termination behavior. Hence, I restate my question: if all queries terminate by default, what syntax to you propose to change the termination criteria to "run forever"? Currently, EMIT CHANGE is used for this and given the current design of KSQL from my understanding EMIT CHANGE can only change the termination/non-termination behavior of a query -- I don't think that KSQL has any syntax to change the output behavior atm. Please correct me if I am wrong. (Note that KSQL currently only support emitting of intermediate results; there is a KLIP to add support for SUPPRESS though, but currently EMIT CHANGES seems to only alter the termination behavior.)

I would like to exclude a discussion about output behavior for now and only focus on termination criteria. Curious to see what syntax you have in mind for a non-terminating filter query over an input stream.

@PeterLindner Not sure if I understand the table-function proposal? How would this work for a stream-filter query, or a windowed stream-stream join?

@purplefox
Copy link
Contributor

@purplefox @PeterLindner Your discussion seems to go a little sideways. The idea of a "table only" model is not proposed in this KLIP -- if you want to suggest something like this, feel free to do a separate proposal (I have no intend to consider a table only model in this KLIP). The discussion about "source of truth" (ie, local disk within ksql-server vs changelog topics) is an non-semantic implementation discussion and not related at all (neither to this KLIP not to a table-only mode). Because this KLIP is already complex by itself, and I would propose to avoid getting side-tracked.

Fair point! And I apologise for hijacking the discussion ;) I will consider starting a new KLIP. However I still think it is relevant to this KLIP to raise the point that much of the complexity around this discussion disappears when thinking in terms of tables only.

@PeterLindner
Copy link

@PeterLindner Not sure if I understand the table-function proposal? How would this work for a stream-filter query, or a windowed stream-stream join?

I guess it's meant to be more of a stream-function than a table-function. Since EMIT CHANGES should be removed in this KLIP (is this still the case?) and adding new syntax generally seems undesirable, I tried if I could come up with an ANSI SQL compliant way.
The basic idea was that WITH_GRACE(<stream>,<grace>) would return a stream omitting all out of order messages that arrive later than <grace>. Based on that the engine could figure out when the full result set has been returned an terminate.

-- terminates when stream time > t_end + grace because the full result set has already been returned
SELECT * FROM WITH_GRACE(<stream>, <grace>)
   WHERE ROWTIME BETWEEN <t_start> AND <t_end>

-- does not terminate because the result set is unbounded
SELECT * FROM WITH_GRACE(<stream>, <grace>)

Notice that derived streams would naturally inherit the grace until the timestamp is changed.

The engine could also figure out when the full result set for a given window has been returned and close that window.
Similarly for windowed stream-stream joins the engine can figure out when old state can safely be deleted.

I don't know if this is too implicit, though, and confusing for the user.

@mjsax
Copy link
Member Author

mjsax commented Mar 8, 2020

@purplefox No worries. All good. I am not sure if most of the complexity comes from streams vs tabes though, even if I agree that it adds some open questions... (Also in a table only model you need to figure out how to specify grace periods, output behavior etc -- also, how can a stream-table join---that is asymmetric---be done in a table only model?) However, my personal take is that modelling streams as tables is not desirable: I guess you can do it, but it raises the semantic question if you should do it? The idea of streams and tables is to model facts and state explicitly using two first class abstractions -- this makes the semantic of the data explicit. Fact occur/happen and are immutable, while state is valid over a period of time and changes over time. If you model everything as a table, you lose this expressiveness. In fact I believe that the idea of a "fact table" as used in DW shows this "issue" -- there should be fact stream instead :)

@PeterLindner Interesting idea. A stream-function would attach the grace period to the stream. I always thought of the grace period as a property of an (windowed) operator though. Why would a stream need a grace period -- this seems odd to me -- a grace period defines how long to wait and why would I need to "pre filter" a stream using this stream-function before passing it into the operator? Also, because we are non-ANSI anyway, I think it's ok to add new syntax for stream specific things -- also a "stream function" would not be ANSI-SQL as there is no stream concept there.

About removing EMIT CHANGE: I personally still hope that we can remove it (even if I am not religious about it), but it depends if I can convince people that for example a stream query should not terminate by default. It's still an open discussion and depending on those underlying design principles, we may need to (re-)design the language in one or another way. -- At the same time, as @miguno mentioned, EMIT CHANGE convoluts two independent concepts (output and termination behavior) and thus we might need to at least redefine its semantics.

@purplefox
Copy link
Contributor

purplefox commented Mar 9, 2020

@purplefox No worries. All good. I am not sure if most of the complexity comes from streams vs tabes though, even if I agree that it adds some open questions... (Also in a table only model you need to figure out how to specify grace periods, output behavior etc -- also, how can a stream-table join---that is asymmetric---be done in a table only model?)

I can't see any particular issue with a table-stream join, but quite possibly I haven't understood the point fully. Let's discuss this elsewhere.

However, my personal take is that modelling streams as tables is not desirable: I guess you can do it, but it raises the semantic question if you should do it? The idea of streams and tables is to model facts and state explicitly using two first class abstractions -- this makes the semantic of the data explicit. Fact occur/happen and are immutable, while state is valid over a period of time and changes over time. If you model everything as a table, you lose this expressiveness.

I'm not convinced that's its less expressive. I prefer to think in terms of concrete use cases and so far I haven't found one that can't be expressed in a tables only model. But again, there may be something I have missed. Would be good to discuss this further too :)

But... (and I'm not even sure this will be necessary), sometimes it's ok to trade off a bit of expressiveness for better usability and an attractive product offering. Some of the most successful products ever weren't necessarily the ones that had the most complete feature set. Indeed having too many features or a confusingly complex mental model can sometimes be a real handicap for a product. To me the goal here is to make a compelling product offering for our users, something that everyone will want to use not something that is academically perfect. We don't want our product to be only usable/understandable by those with PhDs in CS! ;)

@PeterLindner
Copy link

PeterLindner commented Mar 15, 2020

because we are non-ANSI anyway, I think it's ok to add new syntax for stream specific things -- also a "stream function" would not be ANSI-SQL as there is no stream concept there

I thought this would be beneficial because an external system (BI-Tool or whatever) could happily construct this query and never know it just queried a stream.

why would I need to "pre filter" a stream using this stream-function before passing it into the operator

It was just an idea to have a more generic composable function. Thinking about it some more I guess it would make more sense to define grace on every operation that needs it.

Why would a stream need a grace period -- this seems odd to me -- a grace period defines how long to wait

Well, every stream that is the output of an operation with a grace period has an implicit guarantee of maximum out-of-orderness (is this even a word?). Tracking this as a property of the stream could allow the engine to figure out how long to wait on downstream operations (like termination of a query or suppression of intermediate results). Does that make sense?
However I really dislike the implicit nature of such a property, but couldn't come up with anything better.

EDIT: Maybe the user does not have to be aware of such a property actually.

@mjsax
Copy link
Member Author

mjsax commented Mar 15, 2020

I thought this would be beneficial because an external system (BI-Tool or whatever) could happily construct this query and never know it just queried a stream.

I doubt that would work because the result would be a STREAM and I would expect that those BI tools would not be able to handle this anyway (even if the result for the query might be finite for this case)?

it would make more sense to define grace on every operation that needs it.

Agreed. That is exactly my proposal.

Well, every stream that is the output of an operation with a grace period has an implicit guarantee of maximum out-of-orderness (is this even a word?).

-> I would call it "maximum unorder" :D

Tracking this as a property of the stream could allow the engine to figure out how long to wait on downstream operations (like termination of a query or suppression of intermediate results). Does that make sense?

It does make sense, but:

EDIT: Maybe the user does not have to be aware of such a property actually.

Agreed. This would be some internal implementation detail (something like a "watermark").

@PeterLindner
Copy link

@mjsax I guess there are still some pieces missing from my mental puzzle.

#4442 suggests that windowing performs an implicit grouping by the window bounds and therefore can be split into a table function (or I think actually a stream function), which emits rows together with window bounds, and an explicit GROUP BY.
Andy gives the following example:

CREATE TABLE T as 
SELECT 
   stuff 
FROM Tumbling(S, SIZE 1 SECOND) 
GROUP BY something, windowstart, windowend;

But for me this looks like the following should yield the exact same result:
(note that we're still on 5.3.1, so pardon me if I used EMIT CHANGES wrong)

CREATE STREAM tumbling_S as 
SELECT 
   *, 
   FLOOR(rowtime / 1000) * 1000 AS windowstart,
   CEIL(rowtime / 1000) * 1000 AS windowend
FROM S
EMIT CHANGES;

CREATE TABLE T as 
SELECT 
   stuff 
FROM tumbling_S 
GROUP BY something, windowstart, windowend;

But there is no grace period in this case...
This leads me to believe, that grace is (or should be) a property of the aggregation operation and not the actual windowing, would you agree with that?

@PeterLindner
Copy link

I may have found some of the missing puzzle pieces:

defining a grace period is actually just a way to set boundaries on stream time relative to some other time, right?
Wouldn't it be easier to expose stream-time (the one while processing the stream, not the one when the query is submitted; hope this makes sense) and set boundaries for it explicitly?

Let me give some examples: (names are verbose for clarity)

-- terminate query
SELECT * FROM stream 
WHERE stream_time() < 1584576000000;
-- or
SELECT * FROM stream
WHERE stream_time() <= latest_timestamp_of_topic_at_query_submission()
-- or
SELECT * FROM stream
WHERE stream_time() <= current_wall_clock_time()

--terminate timerange query
SELECT * FROM stream 
WHERE rowtime >= 1584576000000
  AND rowtime <= 1584619200000
  AND stream_time() <= 1584619210000;

-- closing any kind of window when bounded by grouping key
SELECT * FROM stream 
WHERE stream_time() < t + 5 MINUTES 
GROUP BY t

-- omit out-of-order records
SELECT * FROM stream
WHERE rowtime > stream_time() - 10 SECONDS

-- select out-of-order records
SELECT * FROM stream
WHERE rowtime <= stream_time() - 10 SECONDS

This has the following benefits:

  1. since stream time is monotonically increasing while scanning a topic, the engine can figure out when the full result set has been returned, that means:
    a) every stream terminates if there is an unambiguous upper bound on stream time
    b) every group is closed if stream time is bounded by at least one of the grouping keys
  2. termination of the query is completely decoupled from ROWTIME
  3. closing groups/windows is completely decoupled from grouping/windowing
  4. out-of-order records can easily be filtered

What do you think?

@mjsax
Copy link
Member Author

mjsax commented Mar 19, 2020

@PeterLindner Thanks for the follow up.

About your first comment, yes I agree, it's semantically (almost) the same. The main difference is, that your second two queries do not have a concept of retention time and thus the result table would grow unbounded. Having a proper window-clause allows to express a retention time though. Another difference is, that your second two queries cannot be updated to express hopping windows (and I guess also no session windows; not sure about that).

This leads me to believe, that grace is (or should be) a property of the aggregation operation and not the actual windowing, would you agree with that?

That aligns with my thinking.

About your second comment:

defining a grace period is actually just a way to set boundaries on stream time relative to some other time, right?

Agreed.

Exposing stream_time() is an interesting idea. Not sure if I understand you grouping example, but especially the last two query are quite neat; I don't think those could be expressed without stream_time(). My first impression is, that I like this idea a lot---seems it provides similar semantics as we discussed before, but easier to understand.

@PeterLindner
Copy link

The main difference is, that your second two queries do not have a concept of retention time and thus the result table would grow unbounded. Having a proper window-clause allows to express a retention time though.

But shouldn't the retention of table rows be a property of the table? This is requested in #3519 anyways.

Not sure if I understand you grouping example

Let me illustrate this with the pseudo windowing from above:

CREATE STREAM tumbling_S AS 
SELECT 
   *, 
   FLOOR(rowtime / 1000) * 1000 AS windowstart,
   CEIL(rowtime / 1000) * 1000 AS windowend
FROM S
EMIT CHANGES;

CREATE TABLE T AS 
SELECT 
   stuff 
FROM tumbling_S
WHERE stream_time() <= windowend + 1000 --this line is new
GROUP BY something, windowstart, windowend;

Let's consider the first window [0, 1000]. When stream time exceeds 2000, no subsequent row can meet the filtering condition for this window any more, therefore the window is closed. Does this make sense?

Another difference is, that your second two queries cannot be updated to express hopping windows

CREATE STREAM hopping_S AS 
SELECT 
   *, 
   EXPLODE( ARRAY[
      (FLOOR(rowtime / 1000) - 1) * 1000,
      FLOOR(rowtime / 1000) * 1000
   ]) AS windowstart,
   EXPLODE( ARRAY[
      CEIL(rowtime / 1000) * 1000,
      (CEIL(rowtime / 1000) + 1) * 1000
   ]) AS windowend
FROM S
EMIT CHANGES;

this should do the job but I'd much prefer a function. Session may be possible as well but much more complicated since is involves state and contractions... but I guess that's not relevant to this KLIP.
However, what I'm trying to convey is that people will come up with creative solutions if the system is that flexible (eg. aligning time of daily windows with local time, which is much requested, would be quite easy or at least could be worked around until the built in function supports it).

@mjsax
Copy link
Member Author

mjsax commented Mar 21, 2020

But shouldn't the retention of table rows be a property of the table?

Agreed, but I am not sure if a non-windowed table should have a retention time at all, and if yes, it should be optional, while for a windowed table a retention time should be mandatory IMHO. Using a window-clause we know that retention time is mandatory and can enforce that the user specifies it -- otherwise, we can't really do this, because it's very hard to extract from the query that the result is a windowed table. Btw: the latest 0.8.0 release support this now: #4157

Thanks for the additional examples.

Let's consider the first window [0, 1000]. When stream time exceeds 2000, no subsequent row can meet the filtering condition for this window any more, therefore the window is closed. Does this make sense?

On a high level it does make sense, however, for the query you present, I am not really sure how the engine should understand that a windowed aggregation is executed to begin with? It would require very complex internal query rewriting/optimization to detect this case. Especially, if you want to allow users to specify other types of windows as you mentioned.

There is also KLIP-10 that discussed emit strategies. How could we combine your proposal with emit strategies? For Kafka Streams there is also https://issues.apache.org/jira/browse/KAFKA-9647 that would be nice to have for ksql, too.

Hence, overall to me it seem simpler to stick with a proper window-clause instead of going the table-function way. Using table functions might be flexible, but it's hard to optimized for the engine (lack of semantic context), and also the query is much harder to read! A window-clause makes the semantics much easier to grog IMHO.

@PeterLindner
Copy link

Hence, overall to me it seem simpler to stick with a proper window-clause instead of going the table-function way. Using table functions might be flexible, but it's hard to optimized for the engine (lack of semantic context), and also the query is much harder to read! A window-clause makes the semantics much easier to grog IMHO.

Agreed, but for completion some answers for your questions

I am not really sure how the engine should understand that a windowed aggregation is executed to begin with?

Well, I guess the assumption of the proposal was that windowed aggregations are just like any other aggregation (which probably is not the case in Kafka Streams for performance reasons). Indeed, that would be very hard to differentiate, the one binary condition would probably be that at least one of the grouping keys has a lower bound that involves stream time.

Also the pseudo windowing queries above are actually valid now (apart from stream time), so I guess with #3519 windowing could be worked around anyways.

How could we combine your proposal with emit strategies?

Have tried to wrap my head around that as well, the only thing I could come up with was another wrapping stateful function like SELECT * FROM SUPPRESS(<windowed query>) that magically just outputs rows when windows are closed.

The important question is: are emit strategies only viable for windows or are they a more general concept?

I personally like to think of time-range queries as singleton windows and I believe all the windowing semantics also apply to time-range queries or maybe even any query (this is where my proposal comes from actually). We already found out that queries need a grace period to close (terminate) and I think the emit strategies would also apply (at least to aggregated time-range queries). Especially KIP-557 would make some neat queries possible (eg with the new LATEST_BY_OFFSET function)

@mjsax
Copy link
Member Author

mjsax commented Mar 25, 2020

The important question is: are emit strategies only viable for windows or are they a more general concept?

I thinks it's more general. For example, you could have a non-windowed aggregation query (or a table-table join) and subscribe to the corresponding output changelog stream -- but your downstream application cannot handle thousands or tens of thousands of updates per second and you want to have some kind of "rate control" to reduce the load downstream. This is exactly why Kafka Streams added suppress() and it seems useful to support this in ksqlDB, too.

@miguno
Copy link
Contributor

miguno commented Mar 26, 2020

+1 to what @mjsax said.

Also, we should not conflate the use of EMIT: it should be only about defining materialization behavior (e.g.: "single, final output" vs. "every intermediate outputs plus the final output"). It should not serve the dual purpose of defining whether or not a query is (a) continuously running vs. (b) terminating. The latter behavior is orthogonal to the first.

For example, a user may want to define a windowed aggregation that keeps running (rather than terminate at specific point in time like 'now'), but separately define whether this aggregation should output only 1 single, final output per window (rather than producing an output for every new window input).

@mjsax
Copy link
Member Author

mjsax commented Mar 26, 2020

I would not use the term "materialization behavior" but "output behavior" to talk about "single final result" vs "intermediate result" etc.

Materialization (from my POV) is about writing the result into a topic or materializing a TABLE into RocksDB (atm not all result tables are materialized into RocksDB but only the changelog is written into an output topic -- hence, materialization is important for pull queries and not a semantic question) and applies only to persistent queries, while "output behavior" applies to both persistent and transient queries alike.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
design-proposal Tag KLIP Prs with this label
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

9 participants