Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-5045: KTable cleanup #2832

Closed
wants to merge 32 commits into from
Closed

KAFKA-5045: KTable cleanup #2832

wants to merge 32 commits into from

Conversation

enothereska
Copy link
Contributor

@enothereska enothereska commented Apr 10, 2017

This is the implementation of KIP-114: KTable state stores and improved semantics:

  • Allow for decoupling between querying and materialisation
  • consistent APIs, overloads with queryableName and without
  • depreciated several KTable calls
  • new unit and integration tests

In this implementation, state stores are materialized if the user desires them to be queryable. In subsequent versions we can offer a second option, to have a view-like state store. The tradeoff then would be between storage space (materialize) and re-computation (view). That tradeoff can be exploited by later query optimizers.

@asfbot
Copy link

asfbot commented Apr 10, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2852/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Apr 10, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2848/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Apr 10, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2857/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Apr 10, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2861/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Apr 10, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2856/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Apr 10, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2867/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Apr 10, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2868/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Apr 10, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2872/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Apr 11, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2885/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Apr 11, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2880/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Apr 11, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2884/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Apr 11, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2885/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Apr 11, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2889/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Apr 16, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2964/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Apr 16, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2969/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Apr 16, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2965/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Apr 21, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3083/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Apr 30, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3308/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Apr 30, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3317/
Test FAILed (JDK 8 and Scala 2.11).

@enothereska
Copy link
Contributor Author

Test failure unrelated to streams or this PR.

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I share the same concern with @dguy that, with this change it seems no matter if user provide the queryable name or not we will always materialize every single KTable with an underlying store. This will kill the performance of Streams. I understand that this may be done in a follow-up PR but would like to discuss how we will change the underlying impl?

* @param prefix processor name prefix
* @return a new unique name
*/
public String newStoreName(final String prefix) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a JIRA for tracking this change, or any of the existing KIPs include this one?

@@ -47,7 +50,7 @@
* final KafkaStreams streams = ...;
* streams.start()
* ...
* final String queryableStoreName = table.getStoreName(); // returns null if KTable is not queryable
* final String queryableStoreName = table.queryableStoreName(); // returns null if KTable is not queryable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, right now even if the user does not provide a queryable name in the operator that generates this KTable, we would still return the internal name in this call. Is that right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, as before. No changes. Just name change (I've added name change to KIP)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will adjust to return null if no queryable name is passed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* alphanumerics, '.', '_' and '-'. If {@code null} this is the equivalent of {@link KTable#through(String)()}
* @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
*/
KTable<K, V> through(final String topic,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand adding the store name / store supplier here is to achieve consistency with builder#table, but I'm wondering what would be the recommended pattern for using through. I.e. with the following topology:

KTable table1 = table0.someOps(..., "name1");
KTable table2 = table1.through("topic", "name2");

What's the difference of querying store name1 and querying store name2? Would those be the same stores? Shall we in this case save one store in the underlying impl?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'd have two queryable stores if you did that pattern.
We could add this to an optimization JIRA, sure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is fine, just curious if we have ever thought about how users would leverage the APIs to determine which stores to query. We can discuss this in a follow-up JIRA.

@asfbot
Copy link

asfbot commented May 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3353/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented May 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3348/
Test PASSed (JDK 7 and Scala 2.10).

@enothereska
Copy link
Contributor Author

enothereska commented May 2, 2017

@guozhangwang regarding "queryable name or not we will always materialize every single KTable with an underlying store". It's not true, we only materialize a KTable if a queryable name is provided, otherwise we don't by default (unless it must be materialized, like an aggregation). For example, if no name is provided in filter, we do not materialize.

@dguy 's concern is different, he points out that we can have virtual materializations and provide views on top of KTables, instead of materializing them. My argument is that an eventual optimizer can choose whether to materialize (use storage) or provide a view (re-compute) and we can add a viewer implementation too eventually. Might require a KIP (e.g., on how the optimizer can choose at runtime between the two).

@asfbot
Copy link

asfbot commented May 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3344/
Test FAILed (JDK 8 and Scala 2.12).

return new KTableImpl<>(topology, name, processorSupplier, sourceNodes, this.queryableStoreName);
}

private KTable<K, V> doQueryableFilter(final Predicate<? super K, ? super V> predicate,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we collapse the code path for having a queryable store name or not into the same function? For example:

filter(.. /*nothing*/) calls filter(.. (String) null);

filter(.. "storeName") calls filter(.. storeSupplier); // if storeName is not null, otherwise pass null as well

filter(.. supplier) do the actual impl, which checks if supplier is null or not

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made the change, but it's not quite exactly as above since the code that takes the supplier does not accept a null supplier. But it's similar.

@asfbot
Copy link

asfbot commented May 3, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3407/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented May 3, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3416/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented May 3, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3410/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented May 3, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3426/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented May 3, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3420/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented May 3, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3417/
Test FAILed (JDK 8 and Scala 2.12).

@enothereska
Copy link
Contributor Author

System tests passed again https://jenkins.confluent.io/job/system-test-kafka-branch-builder-2/287/ & https://jenkins.confluent.io/job/system-test-kafka-branch-builder-2/286/ @guozhangwang . One unit test is unrelated to streams, one is queryOnRebalance timeout again, which we see every now and then and I guess is not fixed yet.

@guozhangwang
Copy link
Contributor

retest this please

@asfbot
Copy link

asfbot commented May 3, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3444/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented May 3, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3450/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented May 3, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3441/
Test FAILed (JDK 8 and Scala 2.12).

@enothereska
Copy link
Contributor Author

enothereska commented May 3, 2017

kafka.api.ConsumerBounceTest > testConsumptionWithBrokerFailures FAILED
kafka.api.ProducerBounceTest > testBrokerFailure FAILED

@guozhangwang
Copy link
Contributor

LGTM!

@guozhangwang
Copy link
Contributor

Merged to trunk. Thanks @enothereska !

@asfgit asfgit closed this in ec9e4ea May 3, 2017
@enothereska enothereska deleted the KAFKA-5045-ktable branch May 4, 2017 06:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants