Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Fix IllegalStateException in Producer #6607

Closed

Conversation

mjsax
Copy link
Member

@mjsax mjsax commented Apr 18, 2019

More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.

Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@mjsax mjsax added the producer label Apr 18, 2019
@mjsax mjsax requested a review from hachikuji April 18, 2019 23:20
@hachikuji
Copy link
Contributor

hachikuji commented Apr 18, 2019

A possible explanation for the the illegal state is that the server is requesting the client to throttle itself. In ClusterConnectionStates, we have the following logic:

    private boolean isReady(NodeConnectionState state, long now) {
        return state != null && state.state == ConnectionState.READY && state.throttleUntilTimeMs <= now;
    }

In maybeSendTransactionalRequest, we will await the throttle time inside in NetworkClientUtils.awaitReady, but the fact that we do not use the current time when sending might mean that we re-enter the throttling window.

Unfortunately, it does not seem to be straightforward to write a test case for this. The best I could suggest is to try to factor out some of the Sender logic into a separate utility. For example, we might have NetworkClientUtils.awaitAndSend. We also need to make changes to MockClient to be able to test this state.

Copy link
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax LGTM. I think I'd suggest we change this to a MINOR PR. We can resolve KAFKA-8248 when we have a test case that validates the fix. Sound good?

@@ -452,11 +452,12 @@ private boolean maybeSendTransactionalRequest(long now) {
if (targetNode != null) {
if (nextRequestHandler.isRetry())
time.sleep(nextRequestHandler.retryBackoffMs());
long currentTimeMs = time.milliseconds();
Copy link
Contributor

@hachikuji hachikuji Apr 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... Seems now is not used anymore?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. Updated the PR.

@mjsax mjsax changed the title KAFKA-8248: Fix IllegalStateException in Producer MINOR: Fix IllegalStateException in Producer Apr 19, 2019
@mjsax
Copy link
Member Author

mjsax commented Apr 19, 2019

Java8 failed with

java.lang.AssertionError: expected:<1.0> but was:<0.0>
	at org.junit.Assert.fail(Assert.java:89)
	at org.junit.Assert.failNotEquals(Assert.java:835)
	at org.junit.Assert.assertEquals(Assert.java:555)
	at org.junit.Assert.assertEquals(Assert.java:685)
	at kafka.controller.ControllerIntegrationTest.testMetadataPropagationOnControlPlane(ControllerIntegrationTest.scala:105)

Seems unrelated. Java11 passed.

Retest this please.

@mjsax
Copy link
Member Author

mjsax commented Apr 19, 2019

Java11 failed with:

java.lang.AssertionError: Rebalance did not complete in time
	at org.junit.Assert.fail(Assert.java:89)
	at org.junit.Assert.assertTrue(Assert.java:42)
	at kafka.api.ConsumerBounceTest.waitForRebalance$1(ConsumerBounceTest.scala:427)
	at kafka.api.ConsumerBounceTest.checkCloseDuringRebalance(ConsumerBounceTest.scala:441)
	at kafka.api.ConsumerBounceTest.testCloseDuringRebalance(ConsumerBounceTest.scala:404)

Standard Output
...
[2019-04-19 06:05:33,736] ERROR [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error for partition group-max-size-test-4 at offset 0 (kafka.server.ReplicaFetcherThread:76)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
[2019-04-19 06:05:37,891] ERROR [Consumer clientId=ConsumerTestConsumer, groupId=group-max-size-test] Attempt to join group failed due to fatal error: Consumer group The consumer group has reached its max size. already has the configured maximum number of members. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:556)
[2019-04-19 06:05:37,892] ERROR [daemon-consumer-assignment]: Error due to (kafka.api.AbstractConsumerTest$ConsumerAssignmentPoller:76)
org.apache.kafka.common.errors.GroupMaxSizeReachedException: Consumer group group-max-size-test already has the configured maximum number of members.

and

java.lang.AssertionError: timed out waiting for /tmp/kafka-6739400839044013844 to be offline
	at kafka.utils.TestUtils$.fail(TestUtils.scala:382)
	at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:792)
	at kafka.server.AlterReplicaLogDirsRequestTest.testAlterReplicaLogDirsRequestErrorCode(AlterReplicaLogDirsRequestTest.scala:102)

Standard Output
[2019-04-19 07:00:11,458] ERROR [ReplicaManager broker=0] Error while changing replica dir for partition topic-0 (kafka.server.ReplicaManager:76)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Partition topic-0 doesn't exist
[2019-04-19 07:00:11,458] ERROR [ReplicaManager broker=0] Error while changing replica dir for partition topic-2 (kafka.server.ReplicaManager:76)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Partition topic-2 doesn't exist
[2019-04-19 07:00:11,458] ERROR [ReplicaManager broker=0] Error while changing replica dir for partition topic-1 (kafka.server.ReplicaManager:76)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Partition topic-1 doesn't exist
[2019-04-19 07:00:11,458] ERROR [ReplicaManager broker=0] Error while changing replica dir for partition topic-4 (kafka.server.ReplicaManager:76)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Partition topic-4 doesn't exist
[2019-04-19 07:00:11,459] ERROR [ReplicaManager broker=0] Error while changing replica dir for partition topic-3 (kafka.server.ReplicaManager:76)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Partition topic-3 doesn't exist
[2019-04-19 07:00:16,562] ERROR [ReplicaManager broker=0] Error while changing replica dir for partition topic-1 (kafka.server.ReplicaManager:76)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Partition topic-1 doesn't exist
[2019-04-19 07:00:17,076] ERROR  (kafka.server.LogDirFailureChannel:76)
java.io.IOException
	at kafka.server.AlterReplicaLogDirsRequestTest.testAlterReplicaLogDirsRequestErrorCode(AlterReplicaLogDirsRequestTest.scala:101)

Java8 passed.

Retest this please.

@hachikuji
Copy link
Contributor

@mjsax I submitted a more complete fix: #6613. I am happy to either let you merge this PR or close it.

@mjsax
Copy link
Member Author

mjsax commented Apr 20, 2019

It's contained in your PR. Closing this.

@mjsax mjsax closed this Apr 20, 2019
@mjsax mjsax deleted the kafka-8248-fix-producer-illegalstateexception branch April 20, 2019 18:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
2 participants