Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add LATEST() UDAF to support stream->table conversion just like topic->table. #3985

Closed
miguno opened this issue Nov 27, 2019 · 31 comments
Closed

Comments

@miguno
Copy link
Contributor

miguno commented Nov 27, 2019

Motivation

It would be very useful to allow users to easily turn a STREAM into the same kind of TABLE that you get when reading a topic into a table. The respective operation, in both cases, is to only remember the latest value seen per key in the stream/topic.

Today, this stream->table conversion (which is an aggregation) is not directly possible, and the workaround is to (1) know the stream's underlying topic, e.g. via DESCRIBE myStream, and then (2) create a new table from that topic via a CT statement.

Note: The opposite direction table->stream is very easy for a user:

CREATE STREAM changeStream AS SELECT * FROM myTable EMIT CHANGES;

Suggestion

This stream->table conversion could be achieved by providing a new aggregation function called LATEST().

In Kafka Streams this would be something like:

KStream<K, V> stream = builder.stream(...);
stream
  .groupByKey()
  .reduce((v1, v2) -> v2);  // the equivalent of a LATEST() aggregation

Mocked ksqlDB example:

-- From quickstart
CREATE STREAM riderLocations
  (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE)
  WITH (kafka_topic='locations', ...);

-- Create 'default' table from the stream = same as creating a table from stream's underlying topic (here: 'locations' topic).
CREATE TABLE latestLocations AS
  SELECT LATEST(profileID), LATEST(latitude), LATEST(longitude)
  FROM riderLocations
  GROUP BY profileId
  EMIT CHANGES;

-- Shorthand
CREATE TABLE latestLocations AS
  SELECT LATEST(*)
  FROM riderLocations
  GROUP BY profileId
  EMIT CHANGES;
@miguno
Copy link
Contributor Author

miguno commented Nov 27, 2019

cc @rmoff

@rmoff
Copy link
Contributor

rmoff commented Nov 27, 2019

As also discussed with @MichaelDrogalis and @purplefox

The limitation of the workaround described is that it does not materialise the table, which means that it cannot be used in a pull query. This constrains the narrative that can be told around tables and illustrating their use and difference from streams.

@mjsax
Copy link
Member

mjsax commented Nov 27, 2019

With regard to KLIP-11, this would be addressed by the table(<stream>) operator.

Also note, that Kafka Streams will add a stream.toTable() operator because the groupBy().reduce() construct is not really a good workaround (especially with regard to null/tombstone handling): (cf https://cwiki.apache.org/confluence/display/KAFKA/KIP-523%3A+Add+KStream%23toTable+to+the+Streams+DSL)

@agavra
Copy link
Contributor

agavra commented Dec 19, 2019

@mjsax - adding some points here based on our offline discussion.

I agree that it shouldn't be used to convert stream->table as is suggested in the original issue description because tombstones would not be properly handled, but I think if LATEST has value independently from stream->table conversions. GROUP BY makes it clear that tombstones will be ignored and we could use it for something like SELECT ticker, LATEST(stock_price), AVG(stock_price) FROM stocks GROUP BY ticker;. Seems powerful.

@blueedgenick
Copy link
Contributor

blueedgenick commented Dec 20, 2019 via email

@miguno
Copy link
Contributor Author

miguno commented Jan 9, 2020

@agavra 👍

I agree that it shouldn't be used to convert stream->table as is suggested in the original issue description because tombstones would not be properly handled [...]

What would we need to change/add so that LATEST() worked properly with tombstones?

I really believe we must provide a way to convert a STREAM to a TABLE with the same semantics as one is able to create the TABLE from a topic. It's such a basic operation that it always pains me to see us advocating users to do the low-level workaround by fiddling with topics.

@mjsax
Copy link
Member

mjsax commented Jan 9, 2020

What would we need to change/add so that LATEST() worked properly with tombstones?

It's not a question what we need to change, but if we should support this -- it seems semantically questionable to me. A aggregation does (by its definition -- at least from my understanding) not support deletes, and thus, it seems "wrong" to change the current behavior.

I really believe we must provide a way to convert a STREAM to a TABLE with the same semantics as one is able to create the TABLE from a topic.

I agree, and I suggest to introduce a table(<stream>) operator to do this (cf KLIP-11)

@miguno
Copy link
Contributor Author

miguno commented Feb 4, 2020

A aggregation does (by its definition -- at least from my understanding) not support deletes, and thus, it seems "wrong" to change the current behavior.

IMHO the operation is not always an "aggregation" (though in practice it is most of the times). It is an operation that takes N inputs to produce 1 output, in whichever way it achieves that. Maybe I should have named the title of this issue "Add a LATEST() UDAF", which would have been more clear.

For example, both being a UDAF, there's no conceptual difference between SUM (which is stateful and where we add the latest value to a running count aka sum, beginning with a starting value of 0) and LATEST (which is stateless and where we only remember the latest value; this is similar to resetting the running count before "adding" the next latest value).

@miguno miguno changed the title Add LATEST() aggregation to support stream->table conversion just like topic->table. Add LATEST() KDAF to support stream->table conversion just like topic->table. Feb 4, 2020
@miguno miguno changed the title Add LATEST() KDAF to support stream->table conversion just like topic->table. Add LATEST() UDAF to support stream->table conversion just like topic->table. Feb 4, 2020
@mjsax
Copy link
Member

mjsax commented Feb 4, 2020

Just an update. KStream#toTable() operator is merged and will be included in upcoming AK 2.5 release.

@mjsax
Copy link
Member

mjsax commented Feb 4, 2020

It is an operation that takes N inputs to produce 1 output

Well, I agree that it produces ONE output. However, if you want to implement deletes, it would need to support ZERO output (or a negative retraction output). Example:

input stream: <A,1><A,2><B,10><B,20><A,null>

output table for SUM:
+-----+-----+
|  A  |  3  |
+-----+-----+
|  B  | 30  |
+-----+-----+

output table for TABLE(stream):
+-----+-----+
|  B  | 20  |
+-----+-----+

This would require to emit NO output for key A what is not supported as you stated yourself above.

Similarly, the intermediate tables (for incremental processing) would be

+-----+-----+   +-----+-----+   +-----+-----+   +-----+-----+   +-----+-----+
|  A  |  1  |   |  A  |  2  |   |  A  |  2  |   |  A  |  2  |   |     |     |
+-----+-----+   +-----+-----+   +-----+-----+   +-----+-----+   +-----+-----+
|     |     |   |     |     |   |  B  | 10  |   |  B  | 20  |   |  B  | 20  |
+-----+-----+   +-----+-----+   +-----+-----+   +-----+-----+   +-----+-----+

This requires to emit some negative/retraction output/update for processing the last A-record and that is not really supported by an UDAF imho.

The final result of `LATEST()` could be:

output table for LATEST:
+-----+-----+
|  A  | null|
+-----+-----+
|  B  | 30  |
+-----+-----+

Note that "null" in the INPUT stream (that is not a chagelog but a fact stream) is a fact -- it's not a tombstone and IMHO an aggregation cannot (or should not) reinterpret facts as updates.

Maybe I should have named the title of this issue "Add a LATEST() UDAF", which would have been more clear.

This would not be better, because UDAF has aggregation in its name.

For example, both being a UDAF, there's no conceptual difference between SUM (which is stateful and where we add the latest value to a running count aka sum, beginning with a starting value of 0) and LATEST (which is stateless and where we only remember the latest value; this is similar to resetting the running count before "adding" the next latest value).

I disagree. An aggregation must produce a result record per group, ie, per input key in our case. Hence, resetting the counter to zero, would not remove the whole row for that key form the result. However, a proper "delete" would require this (compare my example from above how I think LATEST() would keep <A,null> in the result table).

@apurvam apurvam added this to the 0.8.0 milestone Feb 19, 2020
@derekjn
Copy link
Contributor

derekjn commented Feb 19, 2020

I think that the original proposal here is a good first step towards solving this problem. It would be a relatively small, non-disruptive change that would give users a straightforward way to accomplish what they're ultimately trying to do. I do agree with @mjsax's sentiment that there may be more elegant and native ways to accomplish this, and I think we should continue to explore those options as we better understand what users are doing with LATEST.

I also agree with @agavra that the primary use case here is in the context of aggregations with a GROUP BY clause. If we treat LATEST this way, it's just another aggregate so I don't believe that deletions are an issue.

I think we should move forward with what @miguno's original proposal on this issue.

@agavra
Copy link
Contributor

agavra commented Feb 19, 2020

It would be a relatively small, non-disruptive change that would give users a straightforward way to accomplish what they're ultimately trying to do.

@derekjn FWIW, it's not that small to get timestamp semantics (e.g. LATEST based on timestamp, not based on offset) because we don't currently accept multiple arguments into a UDAF and we don't have access to timestamp in UDAFs. If we're okay with LATEST based on offset then there's no problem 😂 but that's somewhat arbitrary. Also see #4395 and discussion on ARGMAX

@derekjn
Copy link
Contributor

derekjn commented Feb 19, 2020

Using offset ordering seems like trouble since we use event time in many places.

I definitely like the idea of ARGMAX since it's more generic. PipelineDB uses keyed_max which is very similar. Perhaps a LATEST call could be rewritten internally to an ARGMAX with ROWTIME.

In any case, adapting the UDAF interface to accept multiple arguments seems worthwhile to me.

@big-andy-coates
Copy link
Contributor

big-andy-coates commented Feb 20, 2020

I'm in agreement with @mjsax one the difference between an aggregation using latest and way of converting a stream to a table using table(stream). This is an important distinction.

Thinking out loud though, I think with Latest users should be able to get the desired stream->table functionality, with tombstone support, with something like:

-- input in form <KEY ->VALUE>: KEY has 1 col: `ROWKEY`; VALUE has col `col0`.
-- input stream: <A->1><A->2><B->10><B->20><A->null>
SELECT latest(col0) FROM s 
    GROUP BY ROWKEY 
    HAVING NOT_NULL(latest(col0));

i.e. we use the HAVING clause to remove entries from the table where the latest value for b is null.

The intermediate tables (for incremental processing) would be

+-----+-----+   +-----+-----+   +-----+-----+   +-----+-----+   +-----+-----+
|  A  |  1  |   |  A  |  2  |   |  A  |  2  |   |  A  |  2  |   |     |     |
+-----+-----+   +-----+-----+   +-----+-----+   +-----+-----+   +-----+-----+
|     |     |   |     |     |   |  B  | 10  |   |  B  | 20  |   |  B  | 20  |
+-----+-----+   +-----+-----+   +-----+-----+   +-----+-----+   +-----+-----+

I think this can also work if there are multiple columns, e.g.

-- input in form <KEY ->VALUE>: KEY has 1 col: `ROWKEY`; VALUE has cols `col0` & `col1`.
-- input stream: <A->1,x><A->2,y><B->10,x><B->20,x><A->null>
SELECT latest(col0), latest(col1) FROM s 
    GROUP BY ROWKEY 
    HAVING NOT_NULL(latest(col0));

The intermediate tables (for incremental processing) would be

+-----+-----+   +-----+-----+   +-----+-----+   +-----+-----+   +-----+-----+
|  A  | 1,x |   |  A  | 2,y |   |  A  | 2,y |   |  A  | 2,y |   |     |     |
+-----+-----+   +-----+-----+   +-----+-----+   +-----+-----+   +-----+-----+
|     |     |   |     |     |   |  B  |10,x |   |  B  |20,x |   |  B  |20,x |
+-----+-----+   +-----+-----+   +-----+-----+   +-----+-----+   +-----+-----+

And I think it can even deal with source streams where each event contains only partial data, assuming each record always contains at least one non-null field, which is required to differentiate it from a tombstone, e.g.

-- input in form <KEY ->VALUE>: KEY has 1 col: `ROWKEY`; VALUE has cols `col0` & `col1`.
-- input stream: <A->1,null><A->null,x><B->null,x><B->20,null><A->null>
SELECT latest(col0), latest(col1) FROM s 
   GROUP BY ROWKEY 
   HAVING NOT_NULL(latest(col0)) AND NOT_NULL(latest(col1));

The intermediate tables (for incremental processing) would be

+-----+--------+   +-----+--------+   +-----+--------+   +-----+--------+   +-----+--------+
|  A  | 1,null |   |  A  | 1,x    |   |  A  | 1,x    |   |  A  | 1,x    |   |     |        |
+-----+--------+   +-----+--------+   +-----+--------+   +-----+--------+   +-----+--------+
|     |        |   |     |        |   |  B  | null,x |   |  B  | 20,x   |   |  B  |20,x    |
+-----+--------+   +-----+--------+   +-----+--------+   +-----+--------+   +-----+--------+

@mjsax
Copy link
Member

mjsax commented Feb 21, 2020

Interesting idea -- however, I see some issues. (1) [minor] The result table would not be materialized in the current implementation. (2) From my understanding there is a semantic difference between a tombstone (ie, value = null) and a SQL NULL. I know that ksqlDB does a translation from value == null to setting all corresponding columns (ie, all non-key columns) to NULL. However, I am actually not sure if this is correct for a TABLE (for a STREAM without delete semantics it makes sense I guess) because a table entry like <key=k,c1=NULL,c2=NULL> is actually a totally valid row in SQL. In general the question about deletes is unclear to me: even if we use table(stream) how can a user actually express a tombstone? Not sure if setting all value columns to NULL would be the right thing to do as we would lose the ability to store a row with all non-key columns being NULL what should be supported IMHO.

@rschooley
Copy link

And I think it can even deal with source streams where each event contains only partial data, assuming each record always contains at least one non-null field, which is required to differentiate it from a tombstone, e.g.

I realize this thread is a technical conversation involving the kafka engineering team, but I'd like to offer an end-user perspective.

With Event Sourcing I'd want to have a topic of Orders that gets projected into a materialized view containing the final state for consumption/sink.

In support of the partial data use case:
The events in an Order topic may only contain the product_id in certain events (OrderRequested, OrderProductRevised). Later events will have the order_id and data like: {tax_amount: 1.00, event: "OrderTaxCalculated"}, or something along those lines.

In terms of the WIP example above:

SELECT latest(col0), latest(col1) 
FROM s 
GROUP BY ROWKEY 
HAVING NOT_NULL(latest(col0)) AND NOT_NULL(latest(col1));

I don't understand the implications behind the scene in the Streams API method, but at a glance AND / OR in the HAVING clause could lead to some strange permutations to ensure the function is run for every message in the source.

It might make more sense to have a separate but very similar function project:

SELECT project(*)
FROM s 
GROUP BY ROWKEY 

Where the backing implementation doesn't only return v2, but does null coalescing to account for any missing keys in the existing or new state.

@derekjn
Copy link
Contributor

derekjn commented Feb 26, 2020

Thank you all for the interesting ideas. In terms of HAVING, is there a reason we'd want to use a function there and not just a generic boolean expression? e.g.,

SELECT latest(col0) FROM s GROUP BY ROWKEY HAVING latest(col0) IS NOT NULL;

In any case, I think that this HAVING support would be a nice to have but not absolutely necessary for the first iteration of LATEST support (assuming it adds effort). I think that we should move forward with just adding a LATEST aggregate, even if we don't initially support tombstones in tables that use it.

Also, I think that an ARGMAX implementation of this would be fine too.

@purplefox
Copy link
Contributor

I guess I am missing something here but I don't understand why latest needs to be based on timestamps or offsets.

Surely for latest you just always overwrite any current value for the key in the aggregate as subsequent values for the same key arrive?

@mjsax
Copy link
Member

mjsax commented Mar 4, 2020

Surely for latest you just always overwrite any current value for the key in the aggregate as subsequent values for the same key arrive?

This would be offset based. (for the same key arrive == offset order)

But if there are out-of-order data in the input topic (or input stream) it seems not to be ideal to follow offset order, because given our temporal semantics, we might want to follow timestamp order but not offset order.

Does this make sense?

@purplefox
Copy link
Contributor

Surely for latest you just always overwrite any current value for the key in the aggregate as subsequent values for the same key arrive?

This would be offset based. (for the same key arrive == offset order)

Ah I see, I think I would call that processing order. But I see in most cases processing order would be the same as offset order, unless you're consuming from multiple partitions (Not sure if that ever happens).

But if there are out-of-order data in the input topic (or input stream) it seems not to be ideal to follow offset order, because given our temporal semantics, we might want to follow timestamp order but not offset order.

My inclination would be to go for offset order as I would guess that will work for most people. The use cases we care about seem to be for things like updating a stock ticker price, or an IoT sensor updating its latest temperature reading, or an Uber driver updating his latest position.

Imho offset based will work for most people, and it's easier to implement so why not implement 2 functions:

  1. Implement a latest_offset() function - latest based on offset- easy to implement and will probably keep most users happy.

Then if 1 doesn't satisfy users we get significant demand for it then think about implementing:

  1. Implement a latest_time() function - latest based on timestamp - harder to implement, but no point in wasting time on it if users don't care.

@agavra
Copy link
Contributor

agavra commented Mar 5, 2020

I think adding a latest_by_offset makes sense as it makes it explicit what's happening. While I agree with @purplefox that this can patch the hole for some users, it's important to note that offset order is really only a good proxy for time on source data - as soon as a re-partition comes into play the offset order and time have very little to do with one another.

@mjsax
Copy link
Member

mjsax commented Mar 5, 2020

Ah I see, I think I would call that processing order. But I see in most cases processing order would be the same as offset order, unless you're consuming from multiple partitions (Not sure if that ever happens).

In Kafka, processing order is always offset order, especially for KSQL because one cannot read from multiple partitions for this case (patter subscriptions are not supported AFAIK).

Btw: for Kafka Streams we currently also implement offset-order for builder.table() and KStream#toTable() but we might switch to timestamp order. Note the supporting timestamp order would behave the same if there is no out-of-order data (and offset-order is only correct if there is not out-of-order data): hence, using timestamp order has no semantic disadvantages at all.

If we want to do a "short cut" and oly support offset order for now, I would also recommend to make it explicit in the name. In the end, the UDF approach is a workaround anyway as deletes are not supported and thus a proper solution must follow eventually.

@miguno
Copy link
Contributor Author

miguno commented Mar 12, 2020

@mjsax regarding (not) supporting tombstones/deletes for tables: Why wouldn’t we able to support deletes in KSQL for this purpose?

(The ConfluentJiraBot message above #3985 (comment) was from me when I replied to the GitHub email notification.)

@mjsax
Copy link
Member

mjsax commented Mar 12, 2020

Discussed above: #3985 (comment)

@purplefox
Copy link
Contributor

I think adding a latest_by_offset makes sense as it makes it explicit what's happening. While I agree with @purplefox that this can patch the hole for some users, it's important to note that offset order is really only a good proxy for time on source data - as soon as a re-partition comes into play the offset order and time have very little to do with one another.

Hi @agavra could you elaborate on this a bit?

AIUI, if we do something like:

select sensor_id, latest(temperature)
from sensor_readings
group by sensor_id

Then it will repartition by sensor_id, which means all readings for the same sensor_id will be processed by the same node, so offset order is still meaningful.

@mjsax
Copy link
Member

mjsax commented Mar 16, 2020

Then it will repartition by sensor_id, which means all readings for the same sensor_id will be processed by the same node, so offset order is still meaningful.

Only if the original input data from sensor_reading is partitioned by sensor_id and no repartitioning happens -- otherwise, if we repartitions by sensor_id two sensor readings from different sensor_reading topic-partitions would be written in an unknown order into the repartition topic (and if you re-run the query, those offsets might be different and also their relative order) -- in the original input topic, the offsets of both records have no real meaning (as there is no ordering across partitions; both might even be the same). Thus, if you access the offset, you can either use the "meaningless" offset of sensor_readings or the non-deterministic offset of the repartition topics.

@purplefox
Copy link
Contributor

Then it will repartition by sensor_id, which means all readings for the same sensor_id will be processed by the same node, so offset order is still meaningful.

Only if the original input data from sensor_reading is partitioned by sensor_id and no repartitioning happens -- otherwise, if we repartitions by sensor_id two sensor readings from different sensor_reading topic-partitions would be written in an unknown order into the repartition topic (and if you re-run the query, those offsets might be different and also their relative order) -- in the original input topic, the offsets of both records have no real meaning (as there is no ordering across partitions; both might even be the same). Thus, if you access the offset, you can either use the "meaningless" offset of sensor_readings or the non-deterministic offset of the repartition topics.

Got it. I think more than likely user will already have sensor data partitioned by sensor id - it seems like the "obvious" thing to do, so latest_by_offset seems a pragmatic first choice.
As mentioned previously I suspect there are a minority of users who have data partitioned by some other field - I think we can wait until they complain before implementing the more complex latest by rowtime :)

@purplefox
Copy link
Contributor

#4782

@mjsax
Copy link
Member

mjsax commented Mar 16, 2020

I guess it's ok add latest_by_offset() and maybe also latest_by_rowtime() -- however, in the end I personally think both are still "short cuts" and I would prefer proper syntax in the language. (Note, that some issue with regard to order even persist if we add proper syntax in the language. For example the new KStream#toTable() operator might also insert a repartition topic and thus there is some non-determinism going on...)

@confluentinc confluentinc deleted a comment from ConfluentJiraBot Mar 18, 2020
@MichaelDrogalis
Copy link
Contributor

I'm going to close this since it shipped in 0.8.0 under latest_by_offset. If we want more variants of this function, let's open new issues.

Language Features automation moved this from To do to Done Apr 2, 2020
@miguno
Copy link
Contributor Author

miguno commented Jun 9, 2020

For other readers, this feature is now available (as @MichaelDrogalis said above). See the documentation at https://docs.ksqldb.io/en/latest/how-to-guides/convert-changelog-to-table/ for a full example.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Development

No branches or pull requests