Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Projection with window function #54818

Closed
jozefRudy opened this issue Sep 20, 2023 · 19 comments
Closed

Projection with window function #54818

jozefRudy opened this issue Sep 20, 2023 · 19 comments
Assignees

Comments

@jozefRudy
Copy link

jozefRudy commented Sep 20, 2023

We are working with time series data. We have a raw table for ingestion, imagine usually 100 000 items per minute (not too much).

CREATE TABLE db.test
(
    id Int32,
    time DateTime,
    bid_price Nullable(Float64),
    ask_price Nullable(Float64),
    bid_size Nullable(Float64),
    ask_size Nullable(Float64),
    volume Nullable(Float64)
) ENGINE = Null();

Then on top of this we aggregate into 1-minute interval.

aggregation table ->

CREATE TABLE db.minute_bars_agg (
    id Int32,
    agg_time DateTime CODEC(Delta, ZSTD),
    agg_last_update AggregateFunction(max, DateTime) CODEC(ZSTD),
    agg_bid_price AggregateFunction(argMax, Nullable(Float64), DateTime) CODEC(ZSTD),
    agg_ask_price AggregateFunction(argMax, Nullable(Float64), DateTime) CODEC(ZSTD),
    agg_bid_size AggregateFunction(argMax, Nullable(Float64), DateTime) CODEC(ZSTD),
    agg_ask_size AggregateFunction(argMax, Nullable(Float64), DateTime) CODEC(ZSTD),
    agg_volume AggregateFunction(sum, Nullable(Float64)) CODEC(ZSTD)
) ENGINE = AggregatingMergeTree 
    PARTITION BY toYYYYMM(agg_time) 
    ORDER BY (id, agg_time);

materialized view ->

CREATE MATERIALIZED VIEW db.minute_bars_mv to db.minute_bars_agg
AS
select
    id,
    toStartOfMinute(time) as agg_time,
    maxState(time) as agg_last_update,
    argMaxState(bid_price, time) as agg_bid_price,
    argMaxState(ask_price, time) as agg_ask_price,
    argMaxState(bid_size, time) as agg_bid_size,
    argMaxState(ask_size, time) as agg_ask_size,    
    sumState(volume) as agg_volume
from db.raw_bars
group by 1, 2;

Our use case is querying certain frequency (e.g. 1-min or 1-hour) for given id, which is reasonably quick.

We have additional query which we are struggling with -> latest minute data.

To make this quick we added a projection ->

ALTER TABLE db.minute_bars_agg ADD PROJECTION last_items (
SELECT
    id,
    max(agg_time),
    maxMerge(agg_last_update),
    argMaxMerge(agg_bid_price),
    argMaxMerge(agg_ask_price),
    argMaxMerge(agg_bid_size),
    argMaxMerge(agg_ask_size)
GROUP BY id
);

This works as we desire (giving last non-null values for prices and sizes). However we are struggling with volume. Since we have multiple updates in a minute, we are using sumState. However, when selecting last minute in a projection unfortunately it is impossible to do something like maxMerge(sumMerge(last_minute)). We do not want to sum volume for entire table, just for the last minute.

We have tried spending time on this in various ways, and there are limitations in every direction. We cannot do a window function in a projection, so following query is slow ->

WITH filtered_volumes AS (
    SELECT
        id,
        max(agg_time) AS threshold_time
    FROM db.minute_bars_agg
    GROUP BY id
)
SELECT
    d.id,
    max(agg_time) as Time,
    maxMerge(d.agg_last_update) AS LastUpdate,
    argMaxMerge(d.agg_bid_price) AS BidPrice,
    argMaxMerge(d.agg_ask_price) AS AskPrice,
    argMaxMerge(d.agg_bid_size) AS BidSize,
    argMaxMerge(d.agg_ask_size) AS AskSize,
    sumMergeIf(d.agg_volume, d.agg_time >= f.threshold_time) AS LatestVolume
FROM db.minute_bars_agg AS d
JOIN filtered_volumes AS f ON d.id = f.id
WHERE d.agg_time >= f.threshold_time
GROUP BY d.id, f.threshold_time;

So we are able to get sumMerge of volumes for last minute but not in pre-calculated way.

Is there something we are missing or some feature planned to be added reasonably soon for us to wait?

@jozefRudy jozefRudy added the question Question? label Sep 20, 2023
@den-crane
Copy link
Contributor

den-crane commented Sep 20, 2023

I don't understand what you want with sumMerge, I don't see sum here. Provide a simpler example, with a single argMax column and with example data.

I think you really need is argMaxMergeState in the projection and finalizeAggregation(argMaxMergeState( is in the query.

@jozefRudy
Copy link
Author

simpler would be to remove all maxMerge columns and replace with one maxMerge column, not sure this makes it much simpler to understand though.

so again, we are focusing on volume column. We don't want argMaxMerge for volume, since volume is a sum (inside given minute). I omitted sum from a PROJECTION to make it clear, as it's not what we want and not clear how this could be added to projection.

Here if that is what you need for better understanding:
ALTER TABLE db.minute_bars_agg ADD PROJECTION last_items ( SELECT id, max(agg_time), maxMerge(agg_last_update), argMaxMerge(agg_bid_price), . . . argSumMerge(agg_volume) GROUP BY id );

for peace of mind.

However, in this way we get latest numbers for prices (bid_price, etc.) and sum of all volume for entire table duration. However, we want only sum of volumes in LAST minute. (so we can't use argMaxMerge). the only workaround would be to do calculation of accumulating sum within a minute in application, and then as you say use argMaxMerge. But again, this is not what I described above.

I also provided a way for obtaining correct data for us, which is slow however:
WITH filtered_volumes AS ( SELECT id, max(agg_time) AS threshold_time FROM db.minute_bars_agg GROUP BY id ) SELECT d.id, max(agg_time) as Time, maxMerge(d.agg_last_update) AS LastUpdate, argMaxMerge(d.agg_bid_price) AS BidPrice, . . . sumMergeIf(d.agg_volume, d.agg_time >= f.threshold_time) AS LatestVolume FROM db.minute_bars_agg AS d JOIN filtered_volumes AS f ON d.id = f.id WHERE d.agg_time >= f.threshold_time GROUP BY d.id, f.threshold_time;

This gives correct result as to what we expect. I am not really sure what you mean by simpler.

I provided null table -> materialized view -> aggregate minute table + projection on top of aggregate minute table to illustrate what we have and what particular query we struggle with.

I am not really sure how finalizeAggregation could help here, since we are not struggling with finalization of query.

@den-crane
Copy link
Contributor

den-crane commented Sep 20, 2023

only sum of volumes in LAST minute.

I see.
No, that is impossible with using projections or mat.views. Their scopes are limited by a part / insert. They are not aware about other data in the table.

I guess Window view should solve it, but they never worked properly.

So I would solve this task outside of Clickhouse.

@jozefRudy
Copy link
Author

jozefRudy commented Sep 20, 2023

unfortunately window view is not an option either, even if it worked properly.

since we want last minute per id, not necessarily uniform across all ids (and related to real time), since if some id has not been updated for an hour, last minute will be 1 hour old (because no update came) so we want 1 hour old latest minute for that id.

maybe what i asked above could be answered (if function like sumMergeIf(d.agg_volume, d.agg_time >= f.threshold_time) AS LatestVolume) could be more optimized -> window function, or used directly in projection. I saw multiple items in github related to optimization of window functions (so maybe in half year or so that will be much quicker or supported for mat views/projections).

Ideal case is that we do not need to calculate cumulative volume ourselves and leave that to clickhouse (since then we can take any data source and freely backfill data, not having to worry about that ourselves).

@den-crane
Copy link
Contributor

window function, or used directly in projection

window function cannot be used in projections. The scope of the projection calculation is limited by a part.

@amosbird
Copy link
Collaborator

@jozefRudy You can try the following and see if the performance is acceptable.

CREATE TABLE db.minute_bars_agg (
    id Int32,
    agg_time DateTime CODEC(Delta, ZSTD),
    agg_last_update AggregateFunction(max, DateTime) CODEC(ZSTD),
    agg_bid_price AggregateFunction(argMax, Nullable(Float64), DateTime) CODEC(ZSTD),
    agg_ask_price AggregateFunction(argMax, Nullable(Float64), DateTime) CODEC(ZSTD),
    agg_bid_size AggregateFunction(argMax, Nullable(Float64), DateTime) CODEC(ZSTD),
    agg_ask_size AggregateFunction(argMax, Nullable(Float64), DateTime) CODEC(ZSTD),
    agg_volume SimpleAggregateFunction(sum, Nullable(Float64)) CODEC(ZSTD)
) ENGINE = AggregatingMergeTree 
    PARTITION BY toYYYYMM(agg_time) 
    ORDER BY (id, agg_time);

CREATE MATERIALIZED VIEW db.minute_bars_mv to db.minute_bars_agg
AS
select
    id,
    toStartOfMinute(time) as agg_time,
    maxState(time) as agg_last_update,
    argMaxState(bid_price, time) as agg_bid_price,
    argMaxState(ask_price, time) as agg_ask_price,
    argMaxState(bid_size, time) as agg_bid_size,
    argMaxState(ask_size, time) as agg_ask_size,    
    sumSimpleState(volume) as agg_volume
from db.raw_bars
group by 1, 2;

ALTER TABLE db.minute_bars_agg ADD PROJECTION last_items (
SELECT
    id,
    max(agg_time),
    maxMerge(agg_last_update),
    argMaxMerge(agg_bid_price),
    argMaxMerge(agg_ask_price),
    argMaxMerge(agg_bid_size),
    argMaxMerge(agg_ask_size),
    sumMap([agg_time], [agg_volume])
GROUP BY id
);

Then you can use this query to calculate last sum

SELECT
    id,
    arraySort(arrayZip(untuple(sumMap([agg_time], [agg_volume]))))[-1]
FROM db.minute_bars_agg
GROUP BY id

In general, we can introduce an aggregate function: argMaxReduce, which aggregates data for maximum arguments only.

@amosbird amosbird self-assigned this Sep 21, 2023
@jozefRudy
Copy link
Author

jozefRudy commented Sep 21, 2023

thank you @amosbird for a good suggestion which I was not aware of. We plan to have years of 1-minute prices. My assumption is that doing this to get last will not perform well (if I did a performance test correctly today, with 1-year of 1-minute data for 10,000 instruments, it ended up taking seconds, so I abandoned this. (did not get into arrayReduce('argMax') which would be more effecient that arraySort, but I still believe the approach I came up with might be much more performant.

I decided to calculate last minute volume per id in separate table. We want to get last available minute for all ids (potentially 10,000 instruments). Getting prices as mentioned above is fast with projection. The only problem is volume.

I am using replacingmergetree, when we would keep all ids, and update version is defined by endOfMinute(time) in materialized view. So all updates per id stay, and are efficiently disposed when next minute comes.

CREATE TABLE db.last_minute_agg (
    id Int32,
    time DateTime,
    volume SimpleAggregateFunction(sum, Nullable(Float64))
) ENGINE = ReplacingMergeTree(time)
ORDER BY id;
CREATE MATERIALIZED VIEW db.last_minute_mv TO db.last_minute_agg AS
SELECT
    id,
    toEndOfMinute(time) as time,
    sum(volume) as volume
FROM db.raw_bars
GROUP BY 1, 2;
ALTER TABLE db.last_minute_agg ADD PROJECTION last_items (
    SELECT
        id,
        time,
        sum(volume) AS volume
    group by 1, 2
);

then to get volume for all ids from table (in case replacing did not yet merge) we need to select latest interval (but there should not be too many intervals per id since last merge operation, so should be reasonably quick.

	SELECT
	    id,
		time,    
		sum(volume) AS volume
	FROM db.last_minute_agg lt
	group by id, time
	order by id, time desc 
	limit 1 by id

This is then selection i am using for latest volumes which i merge on id with latest prices from price table and i am done.
Hopefully this helps someone (feedback appreciated if i am doing something terribly wrong).

@amosbird
Copy link
Collaborator

amosbird commented Sep 21, 2023

@jozefRudy Projections are not supported in ReplacingMergeTree yet. You will end up with incorrect query result.

We plan to have years of 1-minute prices

Yeah, the solution I posted will not scale well. I guess the right path is to build argMaxReduce function. Your use case is good enough which deserves the effort. Here is a prototype :

ALTER TABLE db.minute_bars_agg ADD PROJECTION last_items (
SELECT
    id,
    max(agg_time),
    maxMerge(agg_last_update),
    argMaxMerge(agg_bid_price),
    argMaxMerge(agg_ask_price),
    argMaxMerge(agg_bid_size),
    argMaxMerge(agg_ask_size),
    argMaxReduce('sum', agg_volume, agg_time)
GROUP BY id
);

@jozefRudy
Copy link
Author

that would look awesome (basically the ability to have different aggregation function on smaller interval [minute -> sum] and different on higher -> max of those).

i think maybe for now we live with NOT being able to have latest minute's volume and wait for implementation. Obviously do not want to push for this, but is 6 months a good estimate for this feature to be implemented (just to have some rough idea)?

@amosbird
Copy link
Collaborator

but is 6 months a good estimate for this feature to be implemented

I'll try to implement it at this weekend. If everything goes fine, it could be landed in October.

@amosbird
Copy link
Collaborator

It's better to implement -Min/-Max combinators.

ALTER TABLE db.minute_bars_agg ADD PROJECTION last_items (
SELECT
    id,
    max(agg_time),
    maxMerge(agg_last_update),
    argMaxMerge(agg_bid_price),
    argMaxMerge(agg_ask_price),
    argMaxMerge(agg_bid_size),
    argMaxMerge(agg_ask_size),
    sumMax(agg_volume, agg_time)
GROUP BY id
);

@jozefRudy
Copy link
Author

jozefRudy commented Oct 27, 2023

@amosbird i noticed that the #54947 has not had activity recently. Should we understand there is a certain fundamental problem?

This should generalize to different combinators, e.g. imagine we also want open,high,low,close of last minute price (e.g. bid) as a projection.

So we would have sumMax (as you suggested) and also maxMax, minMax, and then maybe firstMax, lastMax?

extending the projection from above

ALTER TABLE db.minute_bars_agg ADD PROJECTION last_items (
SELECT
    id,
    max(agg_time),
    maxMerge(agg_last_update),
    sumMax(agg_volume, agg_time) as Volume,
    maxMax(agg_bid_price, agg_time) as High,
    minMax(agg_bid_price, agg_time) as Low,
    firstMax(agg_bid_price, agg_time) as Open,
    lastMax(agg_bid_price, agg_time) as Close
GROUP BY id
);

@amosbird
Copy link
Collaborator

i noticed that the #54947 has not had activity recently. Should we understand there is a certain fundamental problem?

@alexey-milovidov said he will review the PR in this week.

then maybe firstMax, lastMax?

What do first and last do here? There isn't any order. any and anyLast don't make sense to me either.

BTW, the combinator has been changed to -ArgMin -ArgMax

@jozefRudy
Copy link
Author

jozefRudy commented Oct 27, 2023

the idea was to get the first price for last agg_time.

hence for same agg_time [max] get first entered price, but maybe this is impossible here, since as you say, data with same agg_time is not ordered.

@amosbird
Copy link
Collaborator

the idea was to get the first price for last agg_time.

That's invalid, because agg_bid_price is already an aggregated value of bid_price. You need to introduce two different aggregates: first_agg_bid_price and last_agg_bid_price

@jozefRudy
Copy link
Author

jozefRudy commented Oct 27, 2023

I think that is what you mean.

CREATE TABLE db.raw_bars
(
    id Int32,
    time DateTime,

    bid_price Nullable(Float64),
    ask_price Nullable(Float64),
    bid_size Nullable(Float64),
    ask_size Nullable(Float64),

    open_trade Nullable(Float64),
    high_trade Nullable(Float64),
    low_trade Nullable(Float64),
    close_trade Nullable(Float64),

    volume Nullable(Float64)
) ENGINE = Null();

aggregation table ->

CREATE TABLE db.minute_bars_agg (
    id Int32,
    agg_time DateTime CODEC(Delta, ZSTD),
    agg_last_update AggregateFunction(max, DateTime) CODEC(ZSTD),
    agg_bid_price AggregateFunction(argMax, Nullable(Float64), DateTime) CODEC(ZSTD),
    agg_ask_price AggregateFunction(argMax, Nullable(Float64), DateTime) CODEC(ZSTD),
    agg_bid_size AggregateFunction(argMax, Nullable(Float64), DateTime) CODEC(ZSTD),
    agg_ask_size AggregateFunction(argMax, Nullable(Float64), DateTime) CODEC(ZSTD),

    agg_open_trade AggregateFunction(argMin, Nullable(Float64), DateTime) CODEC(ZSTD),
    agg_high_trade AggregateFunction(max, Nullable(Float64)) CODEC(ZSTD),
    agg_low_trade AggregateFunction(min, Nullable(Float64)) CODEC(ZSTD),
    agg_close_trade AggregateFunction(argMax, Nullable(Float64), DateTime) CODEC(ZSTD),

    agg_volume AggregateFunction(sum, Nullable(Float64)) CODEC(ZSTD)
) ENGINE = AggregatingMergeTree 
    PARTITION BY toYYYYMM(agg_time) 
    ORDER BY (id, agg_time);

materialized view ->

CREATE MATERIALIZED VIEW db.minute_bars_mv to db.minute_bars_agg
AS
select
    id,
    toStartOfMinute(time) as agg_time,
    maxState(time) as agg_last_update,
    argMaxState(bid_price, time) as agg_bid_price,
    argMaxState(ask_price, time) as agg_ask_price,
    argMaxState(bid_size, time) as agg_bid_size,
    argMaxState(ask_size, time) as agg_ask_size,

    argMinState(trade, time) as agg_open_trade,
    maxState(trade) as agg_high_trade,
    minState(trade) as agg_low_trade,
    argMaxState(trade, time) as agg_close_trade,

    sumState(volume) as agg_volume
from db.raw_bars
group by 1, 2;

But projection currently is not possible for open, high, low and volume because of current limitations, since projection would need to take those for last minute, hence use current PR. Am I correct?

In other words, projection below is possible currently, but not first trade of last minute (as opposed to first trade of entire series which is not what we want), same with min and max of trade in last minute.

ALTER TABLE db.minute_bars_agg ADD PROJECTION last_items (
SELECT
    id,
    max(agg_time),
    maxMerge(agg_last_update),
    argMaxMerge(agg_bid_price),
    argMaxMerge(agg_ask_price),
    argMaxMerge(agg_bid_size),
    argMaxMerge(agg_ask_size),

    open, high, low not possible, since we want min of last minute only

    argMaxMerge(agg_close_trade)
GROUP BY id
);

@amosbird
Copy link
Collaborator

amosbird commented Oct 27, 2023

You need the following:

    min((time, trade)) as agg_open_trade,
    max((time, trade)) as agg_close_trade,
    ...

In projection:

    minArgMax(agg_open_trade, agg_time).2 as Open,
    maxArgMax(agg_close_trade, agg_time).2 as Close,
    ...

@jozefRudy
Copy link
Author

yes, what i meant, this is currently not possible, only after this PR gets merged.

@amosbird
Copy link
Collaborator

@jozefRudy It's merged.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants