Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-13491: IQv2 framework #11557

Merged
merged 3 commits into from Dec 3, 2021
Merged

KAFKA-13491: IQv2 framework #11557

merged 3 commits into from Dec 3, 2021

Conversation

vvcephei
Copy link
Contributor

Implements a new interactive query framework,
as described in KIP-805 (https://cwiki.apache.org/confluence/x/85OqCw).

No public queries are added in this PR, just the framework and tests.

Also, position tracking and bounding is not implemented in this PR.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Contributor Author

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @patrickstuedi and @vpapavas, I'd really appreciate your reviews on this. Also, @cadonna , @bbejeck , or @guozhangwang , since you were involved in the discussion and voting, would you mind giving this a look?

I'm sorry about the length, but it's hopefully a pretty straightforward implementation of the KIP, with a lot of repetition and boilerplate.

Thanks in advance for your time!

* topology.
*/
@Evolving
public <R> StateQueryResult<R> query(final StateQueryRequest<R> request) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Main entry point for the new API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation may look inefficient, but it's actually identical to the way that interactive queries are currently served. I think it could be improved, but I'm leaving it to future PRs.

* State#ERROR}. This Streams instance will need to be discarded and replaced before it can
* serve queries. The caller may wish to query a different instance.
*/
public class StreamsStoppedException extends InvalidStateStoreException {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New exception proposed in the KIP. See KafkaStreams#query javadoc.

Comment on lines +49 to +50
* This is defined as the metadata of the record that is currently being
* processed (or was last processed) by the StreamTask that holds the store.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit of a drive-by; I just noticed the wording could be improved.

* reference.
*/
@Evolving
public class Position {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This adds the public Position API that was proposed in the KIP. I've also gone ahead and removed the private Position api (from the state.internals package).

return "in-memory";
}
};
return new InMemoryKeyValueBytesStoreSupplier(name);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having this inline made it harder to debug my test, so I just popped it out into a separate class.

* Once position bounding is generally supported, we should migrate tests to wait on the
* expected response position.
*/
public static <R> StateQueryResult<R> iqv2WaitForPartitionsOrGlobal(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The javadoc pretty much sums this up. It's a harbinger of good things to come, since as you can see, we'll no longer be vulnerable to race conditions like rebalances sneaking in during the test. We can just wait for a valid result and then verify it.

import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

public class PositionBoundTest {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a basic unit test.

import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertThrows;

public class PositionTest {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another basic unit test, based on the internal API one that I removed.

}

@Test
public void shouldNotHash() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: since this class is mutable, I wanted to make sure we don't accidentally use it in sets or anything.

import java.io.IOException;
import org.junit.Test;

public class PositionTest {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These cases are migrated into the new unit test.

.computeIfAbsent(topic, k -> new ConcurrentHashMap<>())
.compute(
partition,
(integer, prior) -> prior == null || offset > prior ? offset : prior
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, this avoids the use of AtomicLong!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

* Create a deep copy of the Position.
*/
public Position copy() {
return new Position(deepCopy(position));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not implement copy using merge?
Position pos = Position.empyPosition();
pos.merge(this);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review!

That would also work, but this seems more direct.

|| !active)) {
result.addResult(
partition,
QueryResult.forFailure(
Copy link
Contributor

@vpapavas vpapavas Dec 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we add a failure here? The for loop will continue iterating and it might find an active task that hosts that store partition later, right? Don't you want to add the failure at the end of the loop if no such active partition is found?
Moreover, don't you want to exit the loop after the failure?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the failure reason is a bit misleading

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review!

We can just add the failure right away because tasks are uniquely assigned to Streams instances, so it's not possible to find a single task twice in this loop.

That's a good point about exiting as soon at the query is complete. I've updated the PR.

Also, that's a good point about the failure reason. I previously had a specific PositionBound to "requireLatest" (aka "require active"), but I moved it into the StateQueryRequest, since enforcing the task state is the responsibility of the framework, not a store. However, I never changed this FailureReason. I've fixed it with a new reason in the PR. Good catch!

@vvcephei
Copy link
Contributor Author

vvcephei commented Dec 1, 2021

Rebased to resolve conflicts with #11541

final long start = collectExecutionInfo ? System.nanoTime() : -1L;
// TODO: position tracking
if (query instanceof PingQuery) {
result = (QueryResult<R>) QueryResult.forResult(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we ensure type R matches the type of template type of the store?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, it's not possible to guarantee that using Java's type system. The query caller will just have to be sure to submit correctly typed queries based on their a priori knowledge of the store's types. If they get it wrong, they'll get a ClassCastException at run time. This is a bummer, but it's also precisely the same in IQ today.

As far as I know, it's not possible to do better than this.

Copy link
Contributor

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vvcephei , Thanks for the PR!

You probably mean KIP-796 in the description, right?

I will approve your PR since it blocks other PR and it doesn't seem to be risky to merge it. Please consider my feedback for a follow-up PR.

final Map<String, StateStore> globalStateStores = topologyMetadata.globalStateStores();
if (globalStateStores.containsKey(storeName)) {
final StateStore store = globalStateStores.get(storeName);
final QueryResult<R> r =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could you give a more meaningful name to this variable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! Sorry about that.

Comment on lines +1822 to +1825
if (!request.isAllPartitions()
&& handledPartitions.containsAll(request.getPartitions())) {
return result;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Instead of putting a comment, could you extract the condition and give it a meaningful name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see what you mean. Yes, it's a bit mysterious this way.

}

return result;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, the method is quite long. Could you try to extract methods with meaningful names? For example, the checks in the beginning to verify whether the state is known or the stream thread is running can be extracted to methods.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For sure! I was actually hoping that we would have a chance to make this logic more efficient, so I didn't bother making it too pretty, but you're totally right. There's no guarantee on when or if we'll refactor it, so we should make it nice now.

@@ -164,7 +164,7 @@
files="(KafkaStreams|KStreamImpl|KTableImpl).java"/>

<suppress checks="CyclomaticComplexity"
files="(StreamsPartitionAssignor|StreamThread|TaskManager|PartitionGroup).java"/>
files="(KafkaStreams|StreamsPartitionAssignor|StreamThread|TaskManager|PartitionGroup).java"/>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, for the same reason that you thought the query method was too long and complicated :)

Query<R> query,
PositionBound positionBound,
boolean collectExecutionInfo) {
// If a store doesn't implement a query handler, then all queries are unknown.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add this to the javadocs since it seems to be an important information for the outside world?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're totally right. I thought I had said that already, but maybe I'm thinking of some other conversation.

*
* @param <R> The result type of the query.
*/
public final class QueryResult<R> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add unit tests for this class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually decided not to for the classes you pointed out because they are all trivial "struct"-type classes with no logic. I did, however, verify that all the new code has 100% (or as close as possible) code coverage via the existing set of tests in this PR.

* The query's result for global store queries. Is {@code null} for non-global (partitioned)
* store queries.
*/
public QueryResult<R> getGlobalResult() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could not find this method in the KIP. Could you update the KIP?

/**
* Set the result for a partitioned store query. Used by Kafka Streams and available for tests.
*/
public void addResult(final int partition, final QueryResult<R> r) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could not find this method in the KIP. Could you update the KIP?

/**
* Set the result for a global store query. Used by Kafka Streams and available for tests.
*/
public void setGlobalResult(final QueryResult<R> r) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could not find this method in the KIP. Could you update the KIP?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gah! I'm sorry for that miss. I had it in the POC from the beginning, but overlooked it in the KIP.

import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.QueryResult;

public final class StoreQueryUtils {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add unit tests for this class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you're right. These do need unit tests.

@vvcephei
Copy link
Contributor Author

vvcephei commented Dec 3, 2021

Thanks again for the review, @cadonna ! I'm tempted to fix that stuff in the current PR, but I'd like to unblock others from rebasing their work on top of this framework. Since none of the feedback would invalidate those rebases, I'll go ahead and merge and then follow up asap with a PR to address your feedback.

Thanks so much for your time.

@vvcephei
Copy link
Contributor Author

vvcephei commented Dec 3, 2021

The failures are unrelated:

    Build / ARM / kafka.server.ReplicaManagerTest.[1] usesTopicIds=true
    Build / JDK 8 and Scala 2.12 / kafka.server.ReplicaManagerTest.[1] usesTopicIds=true
    Build / JDK 11 and Scala 2.13 / kafka.server.ReplicaManagerTest.[1] usesTopicIds=true

@vvcephei vvcephei merged commit 14c2449 into trunk Dec 3, 2021
@vvcephei vvcephei deleted the iqv2-framework branch December 3, 2021 18:53
xdgrulez pushed a commit to xdgrulez/kafka that referenced this pull request Dec 22, 2021
Implements the major part of the IQv2 framework as proposed in KIP-796.

Reviewers: Patrick Stuedi <pstuedi@apache.org>, Vicky Papavasileiou <vpapavasileiou@confluent.io>, Bruno Cadonnna <cadonna@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
4 participants