Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-4218: Add withkey methods to KGroupedTable #3601

Closed

Conversation

jeyhunkarimov
Copy link
Contributor

This PR aims to add withKey methods to KGroupedTable interface.
This PR is part of KIP-149.
I separated the complete PR into 4 parts as discussed in here. So, this PR concentrates on adding withKey methods to KGroupedTable interface.

Add withkey interfaces

Add withKey converters in AbstractStream

Add withKey interface to KGroupedTable

Remove unnecessary lines

Remove unnecessary lines
@asfgit
Copy link

asfgit commented Jul 30, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/6438/
Test PASSed (JDK 7 and Scala 2.11).

@asfgit
Copy link

asfgit commented Jul 30, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/6423/
Test PASSed (JDK 8 and Scala 2.12).

@dguy
Copy link
Contributor

dguy commented Jul 31, 2017

@bbejeck @mjsax @enothereska @guozhangwang reviews please


/**
* The {@code InitializerWithKey} interface for creating an initial value in aggregations with read-only key.
* Note that provided keys are read-only and should not be modified. Any key modification can result in corrupt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the provided

* For sum, the adder and substractor would work as follows:
* <pre>{@code
* public class SumAdder implements ReducerWithKey<Integer, Integer> {
* public Integer apply(Integer key, Integer currentAgg, Integer newValue) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this example show the key being used somehow?

* }
*
* public class SumSubtractor implements ReducerWithKey<Integer, Integer> {
* public Integer apply(Integer key, Integer currentAgg, Integer oldValue) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

* public Integer apply(Integer currentAgg, Integer oldValue) {
* return currentAgg - oldValue;
* public class SumSubtractor implements ReducerWithKey<Integer, Integer> {
* public Integer apply(Integer currentAgg, Integer oldValue) {
Copy link
Contributor

@bbejeck bbejeck Aug 1, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the ReducerWithKey#apply impl here missing the Integer key parameter?

* For sum, the adder and substractor would work as follows:
* <pre>{@code
* public class SumAdder implements ReducerWithKey<Integer, Integer> {
* public Integer apply(Integer key, Integer currentAgg, Integer newValue) {
Copy link
Contributor

@bbejeck bbejeck Aug 1, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto from above, should we have an example using the key? The same for the rest of the javadoc code examples.


/**
* Aggregate the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper)
* mapped} to the same key into a new instance of {@link KTable} using default serializers and deserializers.
* Records with {@code null} key are ignored.
* Aggregating is a generalization of {@link #reduce(Reducer, Reducer, String) combining via reduce(...)} as it,
* for example, allows the result to have a different type than the input values.
* If the result value type does not match the {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value
* If the result value type does not match the {@link StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this intentional? VALUE_SERDE_CLASS_CONFIG is deprecated.

@bbejeck
Copy link
Contributor

bbejeck commented Aug 1, 2017

Thanks @jeyhunkarimov for the work and multiple PRs. I've made an initial pass. Two things so far:

  1. Basically some nits on the javadoc. Also IMHO I think the newer method need updated examples, what's there looks similar to the previous docs.
  2. We need test coverage on the new methods, so far I only see some updates to the existing `KGroupedTableImplTest.

EDIT: can you rebase as well? thanks.

@mjsax mjsax added the streams label Jan 30, 2018
@mjsax mjsax added the kip Requires or implements a KIP label Jun 12, 2020
@mjsax mjsax closed this Feb 6, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kip Requires or implements a KIP streams
Projects
None yet
5 participants