Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-6936: Implicit Materialized for aggregates #5066

Merged
merged 1 commit into from
Jun 1, 2018

Conversation

joan38
Copy link
Contributor

@joan38 joan38 commented May 22, 2018

In #4919 we propagate the SerDes for each of these aggregation operators.

As @guozhangwang mentioned in that PR:

reduce: inherit the key and value serdes from the parent XXImpl class.
count: inherit the key serdes, enforce setting the Serdes.Long() for value serdes.
aggregate: inherit the key serdes, do not set for value serdes internally.

Although it's all good for reduce and count, it is quiet unsafe to have aggregate without Materialized given. In fact I don't see why we would not give a Materialized for the aggregate since the result type will always be different (otherwise use reduce) and also the value Serde is simply not propagated.

This has been discussed previously in a broader PR before but I believe for aggregate we could pass implicitly a Materialized the same way we pass a Joined, just to avoid the stupid case. Then if the user wants to specialize, he can give his own Materialized.

@guozhangwang @debasishg @seglo Let me know your thoughts.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@joan38 joan38 changed the title Implicit materialized for aggregates Implicit Materialized for aggregates May 22, 2018
@debasishg
Copy link
Contributor

Regarding the following ..

aggregate: inherit the key serdes, do not set for value serdes internally.

For the aggregate method that does not take a Materialized can't we propagate the value serde as well ? If not, then, this PR makes sense to me.

@joan38
Copy link
Contributor Author

joan38 commented May 22, 2018

@debasishg How can you propagate the Serde if the resulting type of the aggregate is not the same as the previous one? This would propagate the wrong Serde.
reduce is safe since it enforces the same return type as the inputs and count is a Long.

@guozhangwang
Copy link
Contributor

@debasishg We are removing the deprecated APIs in Java that only takes a serde as parameters, so now aggregate operators only have two overloaded variant: with and without a Materialized.

@joan38 Currently there is no semantics difference between the two overloaded functions with and without the Materialized, since even for the latter case, our internal implementation will still created a materialized store which would not be exposed to users for queries. But moving forward we have plans to optimize the topology such that under some cases, for example, in aggregations when Materialized is not passed in, we will not create a Materialized store. I think by that time we could think about other ways to go around this issue, so I'm good with this change.

TimeWindowedKStream => TimeWindowedKStreamJ,
KGroupedTable => KGroupedTableJ, _}

import org.apache.kafka.streams.kstream.{KGroupedStream => KGroupedStreamJ, KGroupedTable => KGroupedTableJ, KStream => KStreamJ, KTable => KTableJ, SessionWindowedKStream => SessionWindowedKStreamJ, TimeWindowedKStream => TimeWindowedKStreamJ, _}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it intentional to put them in a single line? Multi-lines are more human readable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this change has been introduced by my IDE and doesn't make sense. I will revert it.

@joan38 joan38 force-pushed the materialized branch 2 times, most recently from c90332e to 1cd7718 Compare May 24, 2018 13:37
@guozhangwang guozhangwang changed the title Implicit Materialized for aggregates KAFKA-6936: Implicit Materialized for aggregates May 24, 2018
Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a meta comment: to make aggregate reduce and count API consistency, I'd still prefer either maintain two overloads for each, or one for each, instead of two for reduce and count and one for aggregate. Personally I'd prefer two for each as it will benefit in the future to optimize away physical materialization, but I'm not sure if we can disambiguate aggregate(..)(implicit Materialized) with aggregate(..), is it possible?

On the other we could consider making reduce and count also only having one overload function. E.g. in KGroupedTable:

def count(implicit materialized: Materialized[K, Long, ByteArrayKeyValueStore]): KTable[K, Long]

def reduce(adder: (V, V) => V)(subtractor: (V, V) => V)(implicit materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V]

Note for reduce I used the same pattern in the other PR trying to leverage on type inference, but again I'm not 100% sure if it would work.

@debasishg @joan38 wdyt?

@debasishg
Copy link
Contributor

@guozhangwang I agree in principle. I am on a vacation right now and don't have the infrastructure to code. It would be good if @joan38 could verify your concerns.

@joan38
Copy link
Contributor Author

joan38 commented May 25, 2018

@guozhangwang the first way of doing is what I tried, unfortunately it's not able to desambiguate the 2.

I think the other way around unifying reduce and count with aggregate is a good idea.
I will do the change today.

@joan38 joan38 force-pushed the materialized branch 2 times, most recently from c95686a to fa72082 Compare May 25, 2018 21:11
@joan38
Copy link
Contributor Author

joan38 commented May 27, 2018

Should we do count instead of count()?

@joan38 joan38 force-pushed the materialized branch 2 times, most recently from df38a90 to 7f7be17 Compare May 30, 2018 07:17
@joan38
Copy link
Contributor Author

joan38 commented May 30, 2018

@guozhangwang @debasishg Any news on this?

@guozhangwang
Copy link
Contributor

Sorry for the late reply.

If we cannot desambiguate the two overloaded functions for count let's make the other two: aggregate and reduce also have one function only then.

I'd prefer count() over count since, in Scala's principle, if it has any side-effects internally we should leave with with the bracket. As for our case, internally we would add some stateful processor node into the topology with this API, so count() would be preferred.

@joan38
Copy link
Contributor Author

joan38 commented May 30, 2018

Ok so the first point is done.

I will revert back to count() since there is a side effect.

Thanks

@guozhangwang
Copy link
Contributor

Note the jenkins failures are relevant:

07:20:40 /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk8-scala2.11/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala:94: not enough arguments for method count: (implicit materialized: org.apache.kafka.streams.kstream.Materialized[String,Long,org.apache.kafka.streams.scala.ByteArrayKeyValueStore])org.apache.kafka.streams.scala.kstream.KTable[String,Long].
07:20:40 Unspecified value parameter materialized.
07:20:40           .count()
07:20:40                 ^
07:20:40 /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk8-scala2.11/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala:122: not enough arguments for method count: (implicit materialized: org.apache.kafka.streams.kstream.Materialized[String,Long,org.apache.kafka.streams.scala.ByteArrayKeyValueStore])org.apache.kafka.streams.scala.kstream.KTable[String,Long].
07:20:40 Unspecified value parameter materialized.
07:20:40         .count()(Materialized.as("word-count"))
07:20:40               ^
07:20:40 two errors found
07:20:40 

@joan38
Copy link
Contributor Author

joan38 commented May 31, 2018

@guozhangwang The errors are fixed now. This should be ready to go I guess?

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One nit comment, otherwise LGTM.

.groupBy((k, v) => v)
.count()
.groupBy((_, v) => v)
.count
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should call count() here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arf, missed this one. Thanks for picking it

@guozhangwang guozhangwang merged commit ad56f04 into apache:trunk Jun 1, 2018
@joan38
Copy link
Contributor Author

joan38 commented Jun 1, 2018

Thanks @guozhangwang

ying-zheng pushed a commit to ying-zheng/kafka that referenced this pull request Jul 6, 2018
…ache#5066)

In apache#4919 we propagate the SerDes for each of these aggregation operators.

As @guozhangwang mentioned in that PR:

```
reduce: inherit the key and value serdes from the parent XXImpl class.
count: inherit the key serdes, enforce setting the Serdes.Long() for value serdes.
aggregate: inherit the key serdes, do not set for value serdes internally.
```

Although it's all good for reduce and count, it is quiet unsafe to have aggregate without Materialized given. In fact I don't see why we would not give a Materialized for the aggregate since the result type will always be different (otherwise use reduce) and also the value Serde is simply not propagated.

This has been discussed previously in a broader PR before but I believe for aggregate we could pass implicitly a Materialized the same way we pass a Joined, just to avoid the stupid case. Then if the user wants to specialize, he can give his own Materialized.

Reviewers: Debasish Ghosh <dghosh@acm.org>, Guozhang Wang <guozhang@confluent.io>
@joan38 joan38 deleted the materialized branch August 13, 2018 23:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants