Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement latest_by_offset() UDAF #4782

Merged
merged 4 commits into from Mar 17, 2020

Conversation

purplefox
Copy link
Contributor

@purplefox purplefox commented Mar 16, 2020

Description

Please see #3985

Implements latest_by_offset() UDAF which computes the latest value for a column. Latest being defined as offset order.

Testing done

Added new unit test and QTT test

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

@purplefox purplefox requested review from JimGalasyn and a team as code owners March 16, 2020 17:56
Copy link
Contributor

@big-andy-coates big-andy-coates left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @purplefox, awesome to see this!

Some thoughts, (appologies if these have been covered already):

  • Null handling: always ignoring null values seems arbitrary. Some users may want the nulls, others may not. Choosing one means the UDAF doesn't work for the others. Can we not support both? (I may have to be two separate UDAFs until we enhance our UDAF framework ... no sure).
  • Is there any reason why you've not added variants for complex types ARRAY, MAP and STRUCT? Or is this not possible with the current UDAF framework? Ah... missing a UDAF variant of SchemaProvider... boo! ;)

@blueedgenick
Copy link
Contributor

I have a couple of points of confusion to raise here:

First (easy one?) is naming: please lets not call this "latest" anything. It may seem like a pedantic linguistic quibble but "late" means something to do with time, which this function is not really concerned. I find a useful way to think about this is based on an observation that aggregate functions usually come in pairs, like min/max or lag/lead for example. Once we actually get around to having the long-asked-for latest-record-based-on-timestamp (see e.g. #1128, 2 years old already) then we will want to keep latest and earliest as the names for those. Perhaps we could call the udaf in this PR something like last (and it's mirror would be first) or even max_by_offset or something ? The current name mixes concepts of time and offset, which are clearly not the same even though they are often correlated.
😀

Second: i'm not sure this function is actually even particularly useful in this form. If we expose it as a general UDAF then folks are going to use it with group by clauses and then often not get the result that they expect. Why ? because group-by of anything other than the key of the input topic messages causes a repartition (and i'm not even 100% certain that we don't repartition for some of those too) - and once you repartition then the order of messages in the topic fed to the UDAF is effectively random because it interleaves messages from the partitions of the input topics which could be at significantly different offsets from each other and consumed at different speeds. I'm really against introducing a general-purpose function like this which only works-as-intended under a very narrow set of conditions but we have no guard-rails in place to prevent a user shooting herself in the foot with it.

Copy link
Member

@JimGalasyn JimGalasyn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, but please put this new function in the docs-md content.

@purplefox
Copy link
Contributor Author

Second: i'm not sure this function is actually even particularly useful in this form. If we expose it as a general UDAF then folks are going to use it with group by clauses and then often not get the result that they expect. Why ? because group-by of anything other than the key of the input topic messages causes a repartition (and i'm not even 100% certain that we don't repartition for some of those too) - and once you repartition then the order of messages in the topic fed to the UDAF is effectively random because it interleaves messages from the partitions of the input topics which could be at significantly different offsets from each other and consumed at different speeds. I'm really against introducing a general-purpose function like this which only works-as-intended under a very narrow set of conditions but we have no guard-rails in place to prevent a user shooting herself in the foot with it.

Hey Nick, not sure I'd agree here.

Thinking about the kinds of use cases where a "latest" aggregate is useful: Latest stock ticker prices, latest reading from an IoT sensor, latest position of a ride share driver... All these require consistency, and for that the data needs to be partitioned on the id (stock_id, sensor_id etc). If the data isn't partitioned on the id, and we implement an aggregate based on rowtime rather than offset order, then data will get repartitioned to the key, resulting in an aggregate that is only eventually consistent, as different partitions may lag one another. This is unlikely to be the behaviour the user requires for a stock price update!

Imho, latest by offset is what most users are going to need. Yes, that means the data needs to be originally partitioned by the id, but that's just the right thing to do for use cases that require consistency (not eventual).

@purplefox
Copy link
Contributor Author

  • Null handling: always ignoring null values seems arbitrary. Some users may want the nulls, others may not. Choosing one means the UDAF doesn't work for the others. Can we not support both? (I may have to be two separate UDAFs until we enhance our UDAF framework ... no sure).

Imo for the kinds of use cases that this is intended for... stock price updates, IoT sensor updates I suspect ignoring nulls is ok for now (most probably they won't send nulls). I'd suggest going with this for now and creating a new variant which doesn't ignore nulls if there is user demand for it.

  • Is there any reason why you've not added variants for complex types ARRAY, MAP and STRUCT? Or is this not possible with the current UDAF framework? Ah... missing a UDAF variant of SchemaProvider... boo! ;)

Yeah, I think the UDAF framework doesn't support this.

@blueedgenick
Copy link
Contributor

blueedgenick commented Mar 17, 2020 via email

@purplefox
Copy link
Contributor Author

purplefox commented Mar 17, 2020

I'm a pragmatist and I prefer to spend my time finding a useful solution that solves the issue for the 90% in a short amount of time, rather than spending 10 times as long finding a solution that will benefit only a further 10% of users. (Numbers are figurative, obviously ;) )

I think that's particularly important for a project like ksqlDB where we need to encourage adoption above all else. I think we should merge this as I think it will provide value. We can always provide further functions if there is demand at a later date.

Nick - I think you've made valid points even though I don't necessarily agree with them. If you have a strong objection to this then please use your right to veto it by requesting changes on a review then it can go back to the debate stage. Otherwise I will merge it.

@purplefox purplefox merged commit 0c13bb0 into confluentinc:master Mar 17, 2020
@blueedgenick
Copy link
Contributor

i see this is merged anyway - i understand it's the end of the working day now where you are @purplefox so i understand. i only came back here to say two quick things:

(1) - you didn't reply to my request for a change in name of the udaf - did that get overlooked ?

(2) -re

If you have a strong objection to this then please use your right to veto it by requesting changes on a review then it can go back to the debate stage. Otherwise I will merge it.

i won't engage in this game of "you're either with us or you're against us". It's clear from the originating issue (3985) that there's a variety of evolving opinions around this, including from yesterday. By this standard I could go ahead and find any github issue where there are a variety of opinions, hack up a solution i personally preferred, and then "dare" other community members to stop me merging it.
You are obviously free to do whatever you like - you are as entitled to your own opinions as the rest of us - just explaining why i personally won't play along. Hope that doesn't come across as too aggressive, it's not meant to start a fight.

@mjsax
Copy link
Member

mjsax commented Mar 17, 2020

I think that there was a long discussion on the GitHub issue pointing out different pros/cons of different aspects, and I don't think that @purplefox just "hacked up a solution he prefers" -- it was more or less the consensus on the issue to do this PR.

However, asking somebody to "vote it down" is not really helpful to find consensus either. Also, to not given people time to respond (also mind the time difference) and to just merge a PR seems undesired.

Just my 2 cents.

@big-andy-coates
Copy link
Contributor

@blueedgenick You raise an interesting point about the reordering. I'd not thought of that! As you say, if KSQL is doing an internal repartition then the ordering by offset may become nondeterministic if there is more than one partition.

Humm....

-- so this would work:
CREATE STREAM READINGS (ID INT KEY, VALUE DOUBLE) AS (...);
CREATE TABLE AS SELECT latest_by_offset(value) AS VALUE GROUP BY ID;

-- but this probably wouldn't as its repartitioning.
CREATE STREAM READINGS (K STRING KEY, ID INT KEY, VALUE DOUBLE) AS (...);
CREATE TABLE AS SELECT latest_by_offset(value) AS VALUE GROUP BY ID;

The problem is... how is the user supposed to know this by looking at the query?!? It's not very intuitive.

Problem is, to support a UDAF that actually saved the latest value based on some timestamp, e.g. ROWTIME, we need to enhance our UDAF framework, which is more work. Depending on this enhancement to implement a better/alternative 'latest' style UDAF would delay its release, and is probably the reason it's been 2 years and still no latest.

latest_by_offset will be off use to users if there is no out-of-order data in the source data and if no repartition happens to mess up the ordering. So the question is... are we happy with that many ifs?

Ideally, we'd have arg_max / arg_min impls that would allow us to:

  • arg_max(A, ROWTIME): capture 'A' with the highest ROWTIME, i.e. the latest by event time.
  • arg_max(A, ROWOFFSET): capture A with the highest offset, i.e. what the UDAF in this PR offsets.
  • arg_max(A, X): capture A with the highest X, whatever that may be.
  • arg_min equivalents.

But of course, this is more work. Though once we've done that work we'd not really want to keep the oddly behaving latest_by_offset around, through retiring it may be tricky.

Options I see are:

  1. Go with latest_by_offset but ensure the docs call out its nondeterminism and look to replace it latter with arg_max or similar.
  2. Bite the bullet and enhance our UDAF framework to allow us to implement arg_max now and remove latest_by_offset.

@blueedgenick regarding the naming ... do you have any alternative suggestions?

@big-andy-coates
Copy link
Contributor

Imo for the kinds of use cases that this is intended for... stock price updates, IoT sensor updates I suspect ignoring nulls is ok for now (most probably they won't send nulls). I'd suggest going with this for now and creating a new variant which doesn't ignore nulls if there is user demand for it.

That's making some big assumptions about people's data IMHO.

@mjsax
Copy link
Member

mjsax commented Mar 17, 2020

The reordering issue was discussed on the Github issue, too: #3985 (comment)

The main problem is that even if latest_by_timestamp() the issue does not go away, because there might be two records from different partitions with same new key and same timestamp.

Hence, we can only make it deterministic if we introduce a proper operation that upserts a stream into a table without changing the key (ie, something like the table(<stream>) operator or some other syntax). But this was considered too complex to be address right now and the "hacky" UDAF shortcut was chosen (and if we chose a hacky shortcut, it does not really matter if we go with latest_by_offset() or latest_by_timestamp() as both are "broken" anyway -- beside the issue that neither can handle deletes properly).

Personally, I would never ship a "broken" feature but some people are more pragmatic than I am.

@purplefox
Copy link
Contributor Author

purplefox commented Mar 18, 2020

I think that there was a long discussion on the GitHub issue pointing out different pros/cons of different aspects, and I don't think that @purplefox just "hacked up a solution he prefers" -- it was more or less the consensus on the issue to do this PR.

Indeed.

However, asking somebody to "vote it down" is not really helpful to find consensus either. Also, to not given people time to respond (also mind the time difference) and to just merge a PR seems undesired.

The PR was already approved before Nick commented. The process, as I understand it, is if it's approved it can merged. If someone subsequently has strong objections then they can start a new debate and submit new PRs.

If folk don't like that process let's start a discussion about changing it.

Moreover I specifically gave Nick the opportunity to prevent the merge and a further grace period of a full working day before actually merging it, which I didn't really need to as it was already approved, but I did that anyway.

If you don't want a PR merged than please make that clear, either in a comment or by requesting changes. If that had happened I also wouldn't have merged it. It didn't happen.

Just my 2 cents.

@big-andy-coates
Copy link
Contributor

The reordering issue was discussed on the Github issue, too: #3985 (comment)

Good to know - thanks.

The main problem is that even if latest_by_timestamp() the issue does not go away, because there might be two records from different partitions with same new key and same timestamp.

Agreed. However, this seems much less of an issue than the nondeterminism of dealing with offsets. Offsets are an artefact of Kafka, where as the event-time is an artefact of the system being modeled. Hence, in my mind, have a nondeterministic result because the system allows, for example, two IOT sensor readings, with the same id, with the same event time, but different values, seems acceptable: how would we be able to choose between them with no other data? However, have nondeterminism introduced because we're using offsets and a repartition has transparently happened behind the scenes: that's less than ideal IMHO. Personally, I'd prefer not to have such UDAFs released as they're just going to confuse people or damage their view of KSQL when the result doesn't turn out to be 'correct' as they see it.

Hence, we can only make it deterministic if we introduce a proper operation that upserts a stream into a table without changing the key (ie, something like the table(<stream>) operator or some other syntax). But this was considered too complex to be address right now and the "hacky" UDAF shortcut was chosen (and if we chose a hacky shortcut, it does not really matter if we go with latest_by_offset() or latest_by_timestamp() as both are "broken" anyway -- beside the issue that neither can handle deletes properly).

Would this fix the situation where an IOT sensor stream wasn't partitioned by the sensor id? If it does, can you explain as I don't follow.

Personally, I would never ship a "broken" feature but some people are more pragmatic than I am.

I hear you. There's a cost involved in maintaining this going forward. Even if we deprecate the UDAF for new queries we'll need to continue on-going support for historic queries already using it. I guess we could drop it at v1.0.

@purplefox
Copy link
Contributor Author

i won't engage in this game of "you're either with us or you're against us". It's clear from the originating issue (3985) that there's a variety of evolving opinions around this, including from yesterday. By this standard I could go ahead and find any github issue where there are a variety of opinions, hack up a solution i personally preferred, and then "dare" other community members to stop me merging it.

Luckily, that's not what happened, and your interpretation seems way off the mark, and a little condescending.

@mjsax
Copy link
Member

mjsax commented Mar 18, 2020

@purplefox:

The PR was already approved before Nick commented. The process, as I understand it, is if it's approved it can merged. If someone subsequently has strong objections then they can start a new debate and submit new PRs.

Even if a PR is approved, IMHO, if somebody raises a concern it should not be ignored.

Moreover I specifically gave Nick the opportunity to prevent the merge and a further grace period of a full working day before actually merging it, which I didn't really need to as it was already approved, but I did that anyway.

I disagree. IIRC, you left a comment and merged the PR about 9 to 12 hours later (if this is your definition of a full working day it might need to be adjusted, because of time difference -- in a global community you need to give at least 24h for people to reply)

If you don't want a PR merged than please make that clear, either in a comment or by requesting changes. If that had happened I also wouldn't have merged it. It didn't happen.

That is what @blueedgenick did IMHO... He explicitly requested to at least change the name of the UDAF.

@big-andy-coates:

I agree that offset non-determinism is worse than timestamp non-determinism -- and I also think that for this case, we can actually resolve the timestamp no-determinism if we avoid repartitioning.

how would we be able to choose between them with no other data?

Well, if both message have the same ID and same timestamp and land in the same partition, the offset can be used as "tie breaker". If you reprocess the same data from the input topic, this tie breaker will give you a deterministic result (and it would be "correct" assuming that no re-ordering happened when the sensor sent the data to the topic). However, using the offset as tie breaker in a repartition topic is non-deterministic because each time the query is re-run, the repartition topic is re-populated and thus the offsets between runs changes. Does this make sense?

Would this fix the situation where an IOT sensor stream wasn't partitioned by the sensor id? If it does, can you explain as I don't follow.

It does not directly. However, for this case I would recommend a different pattern. Instead of doing auto-repartitioning, a user must repartition the input explicitly, ie, use two queries. For this case, the repartitioning would be done only once (by the first query) -- and if the second "to-table-query" is repeated (but not the first repartitioning query) the result would be deterministic again as the input to the query did not change. For the case that the repartition query is repeated, we can explain to the users that this result in a different input data stream for the second query (because repartitioning is non-deterministic) and thus, if the input data is different, it is not reasonable to ask for the same result. Hence, splitting it into two queries gives us a better way to explain what happens to users, while an internal repartitioning (and a user might be unaware of it) introduces some non-determinism the user cannot control. Does this make sense?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants