Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KSQL should support aggregations on tables #778

apurvam opened this issue Feb 22, 2018 · 2 comments


Copy link

commented Feb 22, 2018

Edit: We have a separate (child) ticket that tracks supporting aggregations on non-windowed tables, see #1136.

Say you want to count the page views per user across multiple time windows. You may have a pageviews stream which has fields like userid, pageid, etc. for every page view in your system.

If you want a view of activity by user every 30 minutes and also every day, you may do something like this:

CREATE TABLE user_activity_30_mins AS
  SELECT userid, COUNT(*) AS num_pageviews
  FROM pageviews
  GROUP BY userid;

CREATE TABLE user_activity_24_hr AS
  SELECT userid, sum(num_pageviews)
  FROM user_activity_30_mins
  GROUP BY userid;

The above won't work, because internally the first query will emit multiple counts per window, and the second query would sum them all, thus over counting. If this was a single streams app and hence a single topology, Kafka Streams will emit undo messages whenever a new value is emitted in a given window, hence allowing the downstream sum to still be correct.

However, in KSQL both queries are different streams applications, and hence the undo records are not emitted, resulting in over counting.

The above behavior is unintuitive. It means that any kind of 'roll up' application will have wrong results. We need to explore ways to to ensure that only one aggregate is emitted per window, or that all queries in an 'group' behave like a single streams application and enjoy the benefits of 'undo' records, etc.

@apurvam apurvam added the enhancement label Feb 22, 2018

@apurvam apurvam changed the title KSQL behavior is not intuitive when aggregates are applied on aggregates Improve KSQL behavior when aggregates are applied on aggregates Feb 22, 2018

@big-andy-coates big-andy-coates added the bug label Mar 2, 2018

@apurvam apurvam removed the enhancement label Mar 13, 2018

@apurvam apurvam changed the title Improve KSQL behavior when aggregates are applied on aggregates Improve KSQL behavior when aggregates are applied on tables Mar 29, 2018

@apurvam apurvam changed the title Improve KSQL behavior when aggregates are applied on tables KSQL should support aggregations on tables Mar 29, 2018


This comment has been minimized.

Copy link

commented Apr 2, 2018

Peeling back the onion on this a bit.

At a high level our approach should be to:

  1. Declare a KStream over the source changelog topic
  2. Reduce it into a KTable
  3. Group the table into a GroupedKTable. This will produce an old/new changelog
  4. Aggregate the GroupedKTable. This includes providing subtractors for handling the old records in the changelog stream. Note that we can only do this for aggregate functions that we can reasonably implement subtractors for. For example, implementing a subtractor for max would require saving all previous values for a key which is prohibitive.

However I think it will be difficult to deal with windowing for table aggregations in the near term. I see 2 issues here:

  1. Dealing with a windowed source. If we just reduce the source into a table, that table would grow larger and larger over time. Ideally we want some way to set a retention period for it. One option could be to reduce it with a tumbling window. This way Kafka Streams would expire windows eventually. The window size here isn’t terribly important for correctness. Its really just a mechanism to get streams to expire records. This feels like a hack though. I think it may be better to add a table call to the StreamsBuilder that accepts Materialized<K,V,WindowStore>

  2. Windowing the aggregation itself. KafkaStreams currently doesn’t have an API for windowing a KGroupedTable. We might be able to hack around this from KSQL to produce an undo-redo changelog KStream which could then be windowed. This would entail:

    • Call KTableImpl.enableSendingOldValues on the reduced table.
    • Call a custom implementation of toStream that retains the Change objects that encapsulate the old and new records.
    • Finally, window and aggregate that stream. We would have to write an aggregator that can handle Change objects and call the adder/subtractor when appropriate.

    I think we should not go this route and instead just implement the behavior we want in Streams itself.

I think for the near term in KSQL the best thing to do is to treat all sources as un-windowed tables, and not support windowing for table aggregates. If the needed windowing APIs get added to Streams we can expose that functionality.

To handle aggregations of aggregations like the original example from this issue, a user could just aggregate the original stream with the desired window. I see no disadvantage to doing the aggregation that way (as opposed to aggregating the aggregation)

So with the proposed restrictions the only use cases we would really be lacking the ability address are:
- A user wanting to do windowed aggregations of the changelog of an unwindowed table
- A user wanting to aggregate to consolidate/split windows of a table windowed by a system external to ksql


This comment has been minimized.

Copy link

commented Mar 26, 2019

@rodesai can this issue be re-opened? Seems like it's still a valid issue to track.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
None yet
4 participants
You can’t perform that action at this time.