Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support SQL window functions #213

Closed
benesch opened this issue Jul 24, 2019 · 13 comments
Closed

Support SQL window functions #213

benesch opened this issue Jul 24, 2019 · 13 comments
Labels
A-sql Area: SQL planning C-feature Category: new feature or request docs-impact Issue will result in some docs impact
Milestone

Comments

@benesch
Copy link
Member

benesch commented Jul 24, 2019

Add support for SQL window functions: https://en.wikipedia.org/wiki/SQL_window_function

Window functions are incredibly powerful but also incredibly complicated to support. We can spec this issue out further once we have customer requests.

@benesch benesch added A-sql Area: SQL planning C-feature Category: new feature or request labels Jul 24, 2019
@sploiselle sploiselle added the docs-impact Issue will result in some docs impact label Dec 9, 2019
@cuongdo cuongdo added this to the Later milestone Jan 27, 2020
@brianbruggeman
Copy link

brianbruggeman commented Jul 30, 2020

I think the lack of support for the windowing functions is a show stopper for me.

My use-case, if you'll indulge a little..

I have a stream of Kafka data that I want to annotate with batched data and then create a ranked order based on a scoring algorithm. In this use-case, I explicitly only care about the most recently streamed kafka data (let's put this at about 30k updates per second) grouped by a single field and I want to join to several tables (they all vary in size, but the largest is about 30 million rows). The join is all really well defined and sort of boring, but it starts with a mechanism for windowing the Kafka stream. From a streaming perspective, I think windowing is a critical feature as there are more than a few use-cases we already employ to window our data to reduce it so that it's both relevant and can be transformed within a reasonable time.

@benesch
Copy link
Member Author

benesch commented Jul 30, 2020

What are the particular window functions that you need? Can you share specific queries or query fragments? From your description it seems like perhaps you need what's described in #978, rather than window functions.

@brianbruggeman
Copy link

brianbruggeman commented Jul 30, 2020

#978 is definitely one of our use cases. We'd like to have a 5 minute aggregation window so we can identify anomalies in one of our kafka streams.

However, the use-case I'm using to evaluate Materialize would require a window function. I have a stream of essentially log data of transition states and I need the last transition. I care much less of when that log was received and much more that I have captured the most recent value, though I suspect a timeout of when the data is no longer valid might be reasonable. So it's not merely a window of +/- time. There is a mode of a kafka stream that allows for this explicit kind of use-case and I believe they call it a K-table. I really want that last update to be persistent for an extended time relative to the data stream.

Using a psuedo-schema like this:

{
  "originating-id": "uuid-string", 
  "timestamp": "iso8601 value",  
  "metadata01": "uuid used as a lookup with later table joins"
}

Here's a simplified example of the data:

["a", "2020-07-30T11:51:27+00:00", "123"]
["b", "2020-07-30T11:49:22+00:00", "123"]
["a", "2020-07-30T12:03:41+00:00", "456"]

@12:00 utc, I expect to have the following persisted and available through a SELECT:

["a", "2020-07-30T11:51:27+00:00", "123"]
["b", "2020-07-30T11:49:22+00:00", "123"]

@12:04 utc, I expect to have the following persisted and available through a SELECT:

["a", "2020-07-30T12:03:41+00:00", "123"]
["b", "2020-07-30T11:49:22+00:00", "123"]

Assuming there were no extra changes, I'd expect those values to persist indefinitely (or persistent until a timeout occurs). And to throw one more thing in here, I want a way of removing the data with more data:

A new event:

["b", "2020-07-30T12:07:15+00:00", null]

And now the persisted state @12:08 only includes the one entry:

["a", "2020-07-30T12:03:41+00:00", "123"]

Edit: I realize that I didn't explicitly identify the postgres windowing functions...

In this specific example, I would have used last_value: https://www.postgresql.org/docs/12/functions-window.html

@frankmcsherry
Copy link
Contributor

Just a quick thought about this example; I may have overfit to it. What you are describing looks like an "upsert", where there is a key, and a payload that either has data or a null. Materialize can handle upserts on ingestion, using the UPSERT envelope. It has some constraints, for example it uses the Kafka key (this roughly tracks their recommendation for key-based compaction) and isn't a window function that you can use in SQL.

Window functions done generally are a bit of a trick to support well incrementally (possible, but painful afaict). At the same time, some large fraction of uses of window functions can be handled by other idioms (a 5 minute sliding window, for example, can be effected by retracting each input record 5 minutes after you insert it).

At the moment, I think we are trying to learn about whether it is a better strategy to try to pilot users of window functions to more efficient idioms (a certain type of work), and which use cases remain once we've done that.

@oaosman84
Copy link

To add another use case in here, imagine a stream of transactions in financial accounts, where there's a running balance of all transactions up to and including the current one.

With a PostgreSQL window function, this could be as easy as.

SELECT
         id,
	date,
	amount,
	account_id,
	sum(amount) OVER (PARTITION BY account_id ORDER BY created_at ASC) as running_balance
FROM
	transaction
ORDER BY
	created_at ASC
LIMIT 10;

There are other ways to maintain/derive that running balance but it's a breeze with window functions.

@rbaronenugent
Copy link

I'm just going to chime in here and say that window functions are critical for my use case. I can expand a little on why:

Window functions would make lateral joins much more powerful. Borrowing from the example in the docs here: https://materialize.com/lateral-joins-and-demand-driven-queries/.

In that example, suppose that we wanted to just get the largest city in each state. OK, easy - we'd simply change the limit to "LIMIT 1".

But now suppose that the populations was a small integer that represented the number of millions of people, rounded to the nearest million (so 3979576 -> 4, 1680992 -> 2, 2320268 -> 2, etc.). If we wanted our lateral join to output all of the "largest" cities (by the new integer measure), we would need to invoke some rank() function, only possible with window functions.

I'm also interested to know if you can see a simpler query that would achieve the same output if I'm missing something.

@frankmcsherry
Copy link
Contributor

I think what you are looking for is even easier (computationally) than the example linked. Rather than do a LIMIT k for each distinct state, you find the maximum population by state, and just use an equality in the lateral join. No window required, afaict:

-- for each state, the cities (plural) with the maximum population
SELECT state, name, pop
FROM
    (   -- each state and its maximum population
        SELECT state, MAX(pop) as max_pop
        FROM cities 
        GROUP BY state 
    ) grp,
    LATERAL (
        -- each city that matches the state and its maximum population
        SELECT name, pop 
        FROM cities 
        WHERE state = grp.state AND pop = grp.max_pop
    );

I tried it out where I falsified SF's population up towards LAs (SF is so provincial otherwise) and took pop / 1000000, which seems to have done the right thing on our test data:

 state |     name      |   pop   
-------+---------------+---------
 AZ    | Phoenix       | 1680992
 IL    | Chicago       | 2695598
 TX    | Houston       | 2320268
 NY    | New_York      | 8336817
 CA    | Los_Angeles   | 3979576
 CA    | San_Francisco | 3881549

@frankmcsherry
Copy link
Contributor

Also, just to make sure the tone is clear: we 100% get that folks want window functions, as they are popular. The main pain point is that they are popular because they are so general, and supporting their generality is hard to do efficiently (i.e. incrementally updating some logic where you assign rank() to each record, do your own crazy logic with that, and then have to re-do it because one new record landed towards the front of the list).

Whenever it looks like I am pushing back, I'm just trying to see how much we can do without window functions, because it's more than just implementing a for loop in our reduce code, it's about seeing if we can pilot folks away from incremental update antipatterns.

@sebasmagri
Copy link

Still understanding the complications related with windowing in a dataflow/streaming context, I think windows are one of the most idiomatic and useful tools for many view maintenance use cases. It was for example one of the most awaited features for Flink, and not wanting to do useless comparisons, also one of the features that made a lot of people jump in on using it.

Perhaps there's some inspiration we can take from those systems. Flink docs on how windows work can serve as a source of ideas.

I'd like to add an example to the pool, in postgres land:

SELECT
  id,
  ts,
  val current_value,
  SUM(val) OVER lifetime AS total,
  SUM(val) OVER yearly   AS year_total,
  SUM(val) OVER monthly  AS month_total,
  SUM(val) OVER daily    AS day_total
FROM my_view
WINDOW
  lifetime AS (),
  yearly AS (
    PARTITION BY id, DATE_TRUNC('year', ts)
  ),
  monthly AS (
    PARTITION BY id, DATE_TRUNC('month', ts)
  ),
  weekly AS (
    PARTITION BY id, DATE_TRUNC('week', ts)
  ),
  daily AS (
    PARTITION BY id, DATE_TRUNC('day', ts)
  );

If it's possible to translate every window function case to a lateral join approach, perhaps that could be done by the query processing?

@schinckel
Copy link

LEAD/LAG are also super useful window functions.

@benesch
Copy link
Member Author

benesch commented May 12, 2022

Calling this one done! We've now got support for the most common window functions in the latest unstable version, including lead and lag: https://materialize.com/docs/unstable/sql/functions/#window-func.

Please file requests for additional specific window functions as new issues!

@benesch benesch closed this as completed May 12, 2022
SQL and Coordinator automation moved this from Icebox to Landed May 12, 2022
@deepxg
Copy link

deepxg commented Sep 3, 2022

Any chance of some more detailed documentation here? Does this mean the 'OVER' clause is now supported, and custom frames?

@carfield
Copy link

carfield commented Feb 8, 2023

Any chance of some more detailed documentation here? Does this mean the 'OVER' clause is now supported, and custom frames?

yea, look like it is here https://materialize.com/docs/sql/functions/

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-sql Area: SQL planning C-feature Category: new feature or request docs-impact Issue will result in some docs impact
Projects
Development

No branches or pull requests