Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14761 Adding integration test for the prototype consumer #13303

Merged

Conversation

philipnee
Copy link
Collaborator

@philipnee philipnee commented Feb 24, 2023

The goal of this PR is to add more tests to the PrototypeAsyncConsumer to test

  1. Successful startup and shutdown.
  2. Commit.

I also added integration tests:

  1. Test commitAsync()
  2. Test commitSync()
    a. Note that I still need to implement committed() to test if commitSync() has been successfully committed.

Additional things:

  • Change KafkaConsumer<K, V> to Consumer<K, V> to use different implementations

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@@ -97,7 +97,7 @@ abstract class AbstractConsumerTest extends BaseRequestTest {
}
}

protected def createConsumerWithGroupId(groupId: String): KafkaConsumer[Array[Byte], Array[Byte]] = {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only thing here was to change the type of Consumer<K, V>

@@ -306,7 +306,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
val partitions = createTopicPartitions(topic, numPartitions = partitionCount, replicationFactor = brokerCount)

addConsumersToGroupAndWaitForGroupAssignment(consumerCount, mutable.Buffer[KafkaConsumer[Array[Byte], Array[Byte]]](),
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same, change to Consumer<K, V>

@@ -993,7 +993,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {

// subscribe all consumers to all topics and validate the assignment

val consumersInGroup = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same, make it Consumer<K, V>

@philipnee philipnee marked this pull request as ready for review March 5, 2023 06:38
});
}

private CompletableFuture<Void> commit(Map<TopicPartition, OffsetAndMetadata> offsets) {
Copy link
Collaborator Author

@philipnee philipnee Mar 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a KIP proposal here:

  1. WDYT about asyncCommit returning a CompletableFuture?
  2. WDYT about poll returning CompletableFuture?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't these methods part of the existing Consumer interface? Won't they be breaking changes?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, for 1 - I think I actually wanted to say commitSync(), sorry about that.

I think they aren't part of the existing interface, right? Currently, both commitSync and poll don't return a future, and I was wondering if it makes sense for us to do that. Also, it makes the interface feels more modern.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we have those methods on the existing Consumer interface. Anyway, that doesn't impact this PR.

@philipnee
Copy link
Collaborator Author

Tests appear to be flaky:

Build / JDK 11 and Scala 2.13 / testMultiWorkerRestartOnlyConnector – org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest
2m 24s
Build / JDK 11 and Scala 2.13 / testElectUncleanLeadersForManyPartitions(String).quorum=kraft – kafka.api.PlaintextAdminIntegrationTest
22s
Build / JDK 17 and Scala 2.13 / shouldThrowIllegalArgumentExceptionWhenCustomPartitionerReturnsMultiplePartitions() – org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest
1m 43s
Build / JDK 17 and Scala 2.13 / shouldRestoreState() – org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest
1m 4s
Fixed 126

Copy link
Contributor

@rajinisivaram rajinisivaram left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@philipnee Thanks for the PR, looks good. Left some questions.

final Deserializer<K> keyDeserializer,
final Deserializer<V> valueDeserializer) {
this.time = time;
this.time = Time.SYSTEM;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we remove time? Could be useful for unit testing?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for spotting this. I think the unit test consumer is meant to be the one below this. I think I should remove the time and make the constructor param list the same as the current one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think as we go along with this, we would need to add this param back to the constructor :) ANyways that's for future PRs then.

});
}

private CompletableFuture<Void> commit(Map<TopicPartition, OffsetAndMetadata> offsets) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't these methods part of the existing Consumer interface? Won't they be breaking changes?

CompletableFuture<Void> future = commit(offsets);
future.whenComplete((r, t) -> {
if (t != null) {
callback.onComplete(offsets, new RuntimeException(t));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why this is a RuntimeException rather than a KafkaException?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, it should've been a KafkaException.

new NoOpConsumerRebalanceListener());
assertEquals(1, consumer.subscription().size());
public void testBackgroundThreadRunning() {
consumer = newConsumer(time, new StringDeserializer(), new StringDeserializer());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to add some verification since the test name suggests we are testing if the background thread was started?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking what's the best way to test that. As we don't have direct access to the background thread from the consumer. I guess we could do that passively by trying to catch the exception, or maybe, actively, I should add a public State state() to the eventHandler? to allow user to prob the background thread stat... I feel we need both, but WDYT? @rajinisivaram @guozhangwang

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we don't need this afterall actually. I feel for the purpose of unit test, we should just try to test the API and method calls. The integration test in this PR is already kind of testing this (otherwise request won't come through)

}

private void injectConsumerConfigs() {
consumerProps.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this test currently attempts to connect to this? Is the intention to convert this into a unit test with mocks or only use it for tests where successful connections are not required?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's the intent: I don't think it is necessary to make the actual connection to the broker on the level of unit test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the longer term, it will be good to use a MockClient rather than one that attempts to connect and fills the log with exceptions. For now, this seems reasonable.

@philipnee
Copy link
Collaborator Author

Thanks for reviewing this, @rajinisivaram - fixes are on the way.

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re: Can we commit? We added two tests: I only saw one inside BaseAsyncConsumerTest? re: Unit tests in PrototypeConsumerTest: you mean PrototypeAsyncConsumerTest` right?

BTW could we also test fetch now with the pool method?

@@ -213,7 +215,7 @@ public UnsentRequest(
Objects.requireNonNull(requestBuilder);
this.requestBuilder = requestBuilder;
this.node = node;
this.callback = new FutureCompletionHandler();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: just call the member field handler as well?

new NoOpConsumerRebalanceListener());
assertEquals(1, consumer.subscription().size());
public void testBackgroundThreadRunning() {
consumer = newConsumer(time, new StringDeserializer(), new StringDeserializer());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

waitUntilTrue(() => {
cb.successCount == 1
}, "wait until commit is completed successfully", 5000)
consumer.commitSync();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why call commitSync here again? If the goal is to make sure the second commit also goes through shall we verify that on the broker's side?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 : although i don't know how to verify that but let me look into it. thanks!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the idea was to test both commitSync and commitAsync.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'll break it up into two tests there. It doesn't really make sense to call commit twice in a row.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other integration tests test the commit with committed method call. I'll leave this as TODO.

@philipnee
Copy link
Collaborator Author

re. @guozhangwang - sorry, I think originally I added 2 tests, then I reduced to 1. Thanks for catching this.

For testing poll() - The fetcher isn't available but I could try to test poll?

Thanks!

Copy link
Contributor

@rajinisivaram rajinisivaram left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@philipnee Thanks for the updates, LGTM. Looks like @guozhangwang 's comments have been addressed as well, but he may want to review again.

});
}

private CompletableFuture<Void> commit(Map<TopicPartition, OffsetAndMetadata> offsets) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we have those methods on the existing Consumer interface. Anyway, that doesn't impact this PR.

}

private void injectConsumerConfigs() {
consumerProps.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the longer term, it will be good to use a MockClient rather than one that attempts to connect and fills the log with exceptions. For now, this seems reasonable.

@philipnee
Copy link
Collaborator Author

Hey @rajinisivaram - Thanks! To your comments
I thought we have those methods on the existing Consumer interface. Anyway, that doesn't impact this PR. : The current consumer uses kafka specific future, but in the new re-write, we are kind of migrating to java CompletableFuture

@philipnee
Copy link
Collaborator Author

@guozhangwang - Regarding testing the commit results, can we defer this to the subsequent PR, i.e. verify it once committed() is implemented.

@guozhangwang
Copy link
Contributor

@guozhangwang - Regarding testing the commit results, can we defer this to the subsequent PR, i.e. verify it once committed() is implemented.

SGTM.

@guozhangwang guozhangwang merged commit 6fbe4d8 into apache:trunk Mar 8, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants