-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-8248; Ensure time updated before sending transactional request #6613
Conversation
@hachikuji Test failures seem to be related. |
@mjsax Thanks. Fixed now. There was some sloppiness in |
log.debug("Sending transactional request {} to node {}", requestBuilder, targetNode); | ||
client.send(clientRequest, now); | ||
client.send(clientRequest, currentTimeMs); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to me that this is the main fix. Do we really want to redefine 'now' in this method as you do on L455? It is used in multiple places in run
and I wonder if there will be side effects of using a different value here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm.. We definitely have to update the time after the blocking operations above, but it's a fair point that the old now
continues to be used in run
afterwards. We actually have the same problem in maybeWaitForProducerId
, which is also a blocking operation. I don't see a way around it except to change the logic in run
to treat now
as more of a startTimeMs
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right that accounting for the blocking operations is key.
Initially, I thought the main issue with recomputing the currentTime
in every down stream call is that it will likely affect the validity of a bunch of the test cases in SenderTest
. But I looked at the cases they should be OK with such a change.
I think standardizing on treating now
as more of a startTimeMs
makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for reviewing. I ended up changing the signature so that it removes the timing assumption. Literally every call to this method was run(time.milliseconds())
, so I got rid of the argument and renamed to runOnce
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the refactoring too: we had similar pattern of passing now
in Streamthread loop but later I refactored it to advanceNowAndComputeLatency
whenever a blocking call is made for the same reason.
@@ -330,8 +330,9 @@ void run(long now) { | |||
} | |||
} | |||
|
|||
long pollTimeout = sendProducerData(now); | |||
client.poll(pollTimeout, now); | |||
long currentTimeMs = time.milliseconds(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this also be now
instead of currentTimeMs
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually favor currentTimeMs
, but perhaps it is unnecessarily pedantic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I personally don't care too much. Was just wondering about keeping the code consistent. If now
is the current way, we might want to keep it this way -- or rename it everywhere (but maybe in a separate PR). Just my 2 cents. Feel free to ignore.
@@ -452,11 +453,12 @@ private boolean maybeSendTransactionalRequest(long now) { | |||
if (targetNode != null) { | |||
if (nextRequestHandler.isRetry()) | |||
time.sleep(nextRequestHandler.retryBackoffMs()); | |||
long currentTimeMs = time.milliseconds(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this also be now
instead of currentTimeMs
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a minor comment, but LGTM otherwise. thanks!
client.poll(pollTimeout, now); | ||
long currentTimeMs = time.milliseconds(); | ||
long pollTimeout = sendProducerData(currentTimeMs); | ||
client.poll(pollTimeout, currentTimeMs); | ||
} | ||
|
||
private long sendProducerData(long now) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(This is a rewrite of the comment I thought I had left with my previous review)
With the changes in this patch, the path for the regular produce diverges from the transactional path: the former takes in a value of now
from runOnce
and uses it all the way through while the latter recomputes it. This is fine because the regular producer path has no blocking calls. It would be good add a comment explaining this at the top level.
log.debug("Sending transactional request {} to node {}", requestBuilder, targetNode); | ||
client.send(clientRequest, now); | ||
client.send(clientRequest, currentTimeMs); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the refactoring too: we had similar pattern of passing now
in Streamthread loop but later I refactored it to advanceNowAndComputeLatency
whenever a blocking call is made for the same reason.
@@ -59,6 +65,8 @@ class ReplicaFetcherMockBlockingSend(offsets: java.util.Map[TopicPartition, Epoc | |||
} | |||
|
|||
override def sendRequest(requestBuilder: Builder[_ <: AbstractRequest]): ClientResponse = { | |||
if (!NetworkClientUtils.awaitReady(client, sourceNode, time, 500)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not clear to me why adding this is necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made changes to MockClient
to make its behavior closer to NetworkClient
. One of the improvements is that you now have to call ready
before calling send
, which is just like NetworkClient
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack.
return notThrottled(now); | ||
|
||
case CONNECTING: | ||
if (now < readyDelayedUntilMs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: some functions are inlined while some others are wrapped in a private function like isUnreachable
. Maybe just put this one as a function as well?
// Nodes which have a delay before ultimately succeeding to connect | ||
private final TransientSet<Node> delayedReady; | ||
|
||
private final Map<String, ConnectionState> connections = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to clarify this refactoring is orthogonal to unit test augment for this bug fix specifically right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I needed it in order to add a good test case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack.
LGTM! I think we can merge post green builds. |
I filed https://issues.apache.org/jira/browse/INFRA-18318 for the jdk11 failure. I am going to go ahead and merge this. |
…6613) This patch fixes a bug in the sending of transactional requests. We need to call `KafkaClient.send` with an updated current time. Failing to do so can result in an `IllegalStateExcepton` which leaves the producer effectively dead since the in-flight correlation id has been set, but no request has been sent. To avoid the same problem in the future, we update the in flight correlationId only after sending the request. Reviewers: Matthias J. Sax <matthias@confluent.io>, Apurva Mehta <apurva@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
…pache#6613) This patch fixes a bug in the sending of transactional requests. We need to call `KafkaClient.send` with an updated current time. Failing to do so can result in an `IllegalStateExcepton` which leaves the producer effectively dead since the in-flight correlation id has been set, but no request has been sent. To avoid the same problem in the future, we update the in flight correlationId only after sending the request. Reviewers: Matthias J. Sax <matthias@confluent.io>, Apurva Mehta <apurva@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
This patch fixes a bug in the sending of transactional requests. We need to call
KafkaClient.send
with an updated current time. Failing to do so can result in anIllegalStateExcepton
which leaves the producer effectively dead since the in-flight correlation id has been set, but no request has been sent. To avoid the same problem in the future, we update the in flight correlationId only after sending the request.Committer Checklist (excluded from commit message)