-
Notifications
You must be signed in to change notification settings - Fork 13.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-14960: TopicMetadata request manager #14386
Conversation
TopicMetadataRequestManager implementation --------- Co-authored-by: Lianet Magrans <lmagrans@confluent.io>
…nly closed once Added IdempotentCloser to ensure resources are only closed once. While this class is broadly useful, it will be constrained to consumer internals until a later date.
TopicMetadataRequestManager implementation --------- Co-authored-by: Lianet Magrans <lmagrans@confluent.io>
…nly closed once Added IdempotentCloser to ensure resources are only closed once. While this class is broadly useful, it will be constrained to consumer internals until a later date.
…es are only closed once" This reverts commit 76d5c96.
Update CommitRequestManager.java clean up clean up remove idempotent references
TopicMetadataRequestManager implementation --------- Co-authored-by: Lianet Magrans <lmagrans@confluent.io>
…nly closed once Added IdempotentCloser to ensure resources are only closed once. While this class is broadly useful, it will be constrained to consumer internals until a later date.
TopicMetadataRequestManager implementation --------- Co-authored-by: Lianet Magrans <lmagrans@confluent.io>
…nly closed once Added IdempotentCloser to ensure resources are only closed once. While this class is broadly useful, it will be constrained to consumer internals until a later date.
…es are only closed once" This reverts commit 76d5c96.
Update CommitRequestManager.java clean up clean up remove idempotent references
…ka into cherry-pick-topic-metadata
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@philipnee : Thanks for the PR. Made a pass of non-testing files. Left a few comments.
@@ -91,7 +89,7 @@ public CommitRequestManager( | |||
} | |||
|
|||
/** | |||
* Poll for the {@link OffsetFetchRequest} and {@link OffsetCommitRequest} request if there's any. The function will | |||
* Poll for the {@link OffsetFetchRequest} and {@link org.apache.kafka.common.requests.OffsetCommitRequest} request if there's any. The function will |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to spell out the package name for OffsetCommitRequest? It doesn't seem this is done consistently.
@@ -220,7 +214,7 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() { | |||
requestTopicDataMap.put(topicPartition.topic(), topic); | |||
} | |||
|
|||
OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder( | |||
org.apache.kafka.common.requests.OffsetCommitRequest.Builder builder = new org.apache.kafka.common.requests.OffsetCommitRequest.Builder( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we import the package?
|
||
public String topic() { | ||
return topic; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to add toString
, equals
and hashcode
?
/** | ||
* <p> | ||
* Manages the state of topic metadata requests. This manager returns a | ||
* {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} when a request is ready to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to spell out the package name?
* be sent. Specifically, this manager handles the following user API calls: | ||
* </p> | ||
* <ul> | ||
* <li>listOffsets</li> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this true? OffsetsRequestManager
says it handles listOffsets.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤦 - listTopics() instead.
throw new RuntimeException(exception.get()); | ||
this.offsets = Collections.unmodifiableMap(offsets); | ||
|
||
for (OffsetAndMetadata offsetAndMetadata : offsets.values()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an existing issue. Could we change toString
to include the fields in the parent class? Do we need to add equals
and hashcode
for consistency?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are quite a few events out there that need to be refactored if we do it this way. Would it be ok for you to skip this for now and I will post a MINOR patch to address after this is closed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If your PR goes in before mine, you can make a comment and I'll add the toString
changes into mine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's fine to clean this up separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -106,6 +104,7 @@ public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { | |||
return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, Collections.emptyList()); | |||
} | |||
|
|||
pendingRequests.inflightOffsetFetches.forEach(System.out::println); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this intended?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤦
} | ||
} | ||
|
||
private class OffsetFetchRequestState extends RequestState { | ||
public final Set<TopicPartition> requestedPartitions; | ||
public final GroupState.Generation requestedGeneration; | ||
public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future; | ||
|
||
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this be private?
List<OffsetFetchRequestState> unsentOffsetFetches = new ArrayList<>(); | ||
List<OffsetFetchRequestState> inflightOffsetFetches = new ArrayList<>(); | ||
|
||
public boolean hasUnsentRequests() { | ||
boolean hasUnsentRequests() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this be private? Ditto below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry - I left out commit. Fixing those obvious mistakes right away.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry - leaving this protected as it is used in one of the test to verify there's no more unsent request.
this.requestBuilder = requestBuilder; | ||
this.node = node; | ||
this.handler = handler; | ||
final BiConsumer<ClientResponse, Throwable> callback) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This constructor seems unused?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is according to my IDE. I think it is used in CommitRequestManager (line 219 and 256) and TopicMetadataRequestManger (line 145)
Hey @junrao - Addressed your comments. I'm holding off on your request for |
CompletableFuture<Map<String, List<PartitionInfo>>> future; | ||
|
||
public TopicMetadataRequestState(final LogContext logContext, | ||
final Optional<String> topic, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation
private MockTime time; | ||
private TopicMetadataRequestManager topicMetadataRequestManager; | ||
|
||
private Properties props; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only used in the setup so no need to keep it at the class level
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@philipnee : Thanks for the updated PR. Made a pass of all files. A few more comments.
* </p> | ||
* <ul> | ||
* <li>listOffsets</li> | ||
* <li>partitionsFor</li> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is fine.
future.complete(res); | ||
inflightRequests.remove(topic); | ||
} catch (RetriableException e) { | ||
if (e instanceof TimeoutException) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This captures the case when a single request takes long on the broker and times out. However, a separate case is that individual request returns on time but with other RetriableException. In that case, TopicMetadataRequestManager will not clean up inflightRequests even when the client request has timed out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @junrao - Sorry for writing such a long reply - I really need to brush up my memory about the network module as well as the TopicMetadataRequest...
In the current code I think the following is how responses should be handled.
For the case of hard failure that the callback receives a non-null exception:
We should remove the inflight request for all non-retriable and TimeoutException because we need to fail the user requests and topic the retry. However, we should continue to retry for RetriableException until timer runs out.
For the case of soft failure (the server respond with an error code), it seems like the right thing to do is to continue to retry except for the non-retriable exception.
It is worth noting that: If the request timed-out (API level timeout), callback is completed with TimeoutException hard failure (in the network delegate module), which should eventually causes the exception to be purged from the inflight request as mentioned above.
To your question: if the responses include a retriable exception, I believe the right thing to do is to continue to retry until the timer runs out.
In fact - it seems like if (e instanceof TimeoutException)
(that I added in the last commit) is actually incorrect. Because it is a RetriableException at it should be retried per:
if (error.exception() instanceof RetriableException)
shouldRetry = true;
What's missing here is handling the hard RetriableException and TimeoutException. I will fix the code and add test cases to cover the scenarios above.
throw new RuntimeException(exception.get()); | ||
this.offsets = Collections.unmodifiableMap(offsets); | ||
|
||
for (OffsetAndMetadata offsetAndMetadata : offsets.values()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's fine to clean this up separately.
|
||
backgroundThread.runOnce(); | ||
verify(applicationEventProcessor).process(any(AssignmentChangeApplicationEvent.class)); | ||
verify(networkClient, times(1)).poll(anyLong(), anyLong()); | ||
verify(commitManager, times(1)).updateAutoCommitTimer(currentTimeMs); | ||
verify(commitManager, times(1)).maybeAutoCommit(offset); | ||
|
||
backgroundThread.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that we close backgroundThread in other cases?
try { | ||
future.get(); | ||
} catch (Throwable e) { | ||
System.out.println(e.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, we should use logging instead of print.
Address PR Update TopicMetadataRequestManager.java
handleException(exception, responseTimeMs); | ||
return; | ||
} | ||
handleResponse(response, responseTimeMs); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that the exception handling logic here ends up in 2 places. Apart from the initial handleException
, this handleResponse
could be as well end up handling exceptions, in a very similar way (catch block on handleResponse
vs the handleException
). If we prefer to keep them separated because of the "retryOnTimeout
" part, which seems to be what makes them different, it would be helpful maybe to add a comment to describe that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, @lianetm - Do you think it would be clearer to keep them in 1 place? The intention was to handle hard failures in handleException and soft failures (i.e. returning an error code) in handleResponse.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be clearer and better to maintain/troubleshoot if error handling is in one place. Maybe having a single handleException(final Throwable exception, final long responseTimeMs, boolean retryOnTimeout)
, called in both cases:
if (exception != null) {
// Handle hard errors
handleException(exception, responseTimeMs, false);
...
}
...
catch (Exception e) {
// Handle soft errors
handleException(e, responseTimeMs, true);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@philipnee : Thanks for the updated PR. A few more comments.
|
||
private void processResponseOrException(final ClientResponse response, | ||
final Throwable exception) { | ||
long responseTimeMs = System.currentTimeMillis(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we pass in currentTime from the caller?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi Jun - understood making repeated system calls is bad. However - we do need to get the time when the response is received, so this is kind of the lazy way to do it.
I think it is possible to expose the time during the callback/future completion during the network poll (because it is computed at the top of the background thread loop and pass down to network poll.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, could we use ClientResponse.receivedTimeMs
?
} | ||
|
||
private void handleException(final Throwable exception, final long responseTimeMs) { | ||
if (exception instanceof TimeoutException || !(exception instanceof RetriableException)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TimeoutException is a RetriableException. Should we just treat it as other RetriableException? That seems to be what the old consumer does.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think for now - the manager will continue to retry all RetriableException including TimeoutException. Currently, we don't have a way to cancel the request when the user API request times-out, so this is a gap there, but a jira is filed here: https://issues.apache.org/jira/browse/KAFKA-15475
private final LogContext logContext; | ||
|
||
public TopicMetadataRequestManager(final LogContext logContext, final ConsumerConfig config) { | ||
this.logContext = logContext; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we just get rid of this
? Ditto below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry - got into the habit of using this
. 😅
final long retryBackoffMaxMs) { | ||
super(logContext, TopicMetadataRequestState.class.getSimpleName(), retryBackoffMs, | ||
retryBackoffMaxMs); | ||
this.future = new CompletableFuture<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we just get rid of this
?
* {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest} if needed. | ||
*/ | ||
private Optional<NetworkClientDelegate.UnsentRequest> send(final long currentTimeMs) { | ||
if (!this.canSendRequest(currentTimeMs)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be useful to be consistent in convention. If we want to add this
in front of methods in the parent class, we want to do this consistently.
future.complete(res); | ||
inflightRequests.remove(topic); | ||
} catch (RetriableException e) { | ||
onFailedAttempt(responseTimeMs); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The case that I am wondering is the following.
DefaultBackgroundThread is in a retry loop for an inflightRequest. PrototypeAsyncConsumer.partitionsFor eventually calls CompletableApplicationEvent.get(Timer timer) and times out. Do we still keep the request in inflightRequests at that point? Is that a concern?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Jun for raising this point - I think what we should do is to complete the request with a non-retriable exception to prevent the request from retrying. Otherwise - the request manager will never remove the request from the inflightRequest.
@@ -298,8 +289,8 @@ private void onFailure(final long currentTimeMs, | |||
} | |||
|
|||
private void retry(final long currentTimeMs) { | |||
onFailedAttempt(currentTimeMs); | |||
onSendAttempt(currentTimeMs); | |||
this.onFailedAttempt(currentTimeMs); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
currentTimeMs is when the request is sent, not when the response is receive. Could we pass along ClientResponse.receivedTimeMs
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed, see line #259
|
||
private void processResponseOrException(final ClientResponse response, | ||
final Throwable exception) { | ||
long responseTimeMs = System.currentTimeMillis(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, could we use ClientResponse.receivedTimeMs
?
920d63b
to
ec600b3
Compare
@@ -38,8 +38,8 @@ | |||
* Whether there is an existing coordinator. | |||
* Whether there is an inflight request. | |||
* Whether the backoff timer has expired. | |||
* The {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} contains either a wait timer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just cleaning up long references
@junrao @lianetm - I hope I've addressed most, if not all, comments that you pointed out. So, thank you both for spending time reviewing my code. We did have an offline discussion about user API timeout handling, and a bug report is filed here KAFKA-15475. I will submit a patch after this PR to address this small gap. Lastly, here are the failing tests:
I believe these are flaky. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@philipnee : Thanks for the updated PR. Just a few minor comments.
|
||
public TopicMetadataRequestManager(final LogContext context, final ConsumerConfig config) { | ||
logContext = context; | ||
log = logContext.logger(this.getClass()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we remove this
?
} | ||
|
||
private void completeFutureAndRemoveRequest(final Throwable throwable) { | ||
this.future.completeExceptionally(throwable); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we remove this
?
@@ -91,7 +90,7 @@ public CommitRequestManager( | |||
} | |||
|
|||
/** | |||
* Poll for the {@link OffsetFetchRequest} and {@link OffsetCommitRequest} request if there's any. The function will | |||
* Poll for the {@link OffsetFetchRequest} and {@link OffsetCommitRequestState} request if there's any. The function will |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, OffsetCommitRequestState should be OffsetCommitRequest, right?
@@ -395,27 +386,28 @@ class PendingRequests { | |||
List<OffsetFetchRequestState> unsentOffsetFetches = new ArrayList<>(); | |||
List<OffsetFetchRequestState> inflightOffsetFetches = new ArrayList<>(); | |||
|
|||
public boolean hasUnsentRequests() { | |||
// Visible for teseting |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo teseting
* <p>If the request is new, it invokes a callback to remove itself from the {@code inflightOffsetFetches} | ||
* upon completion.</> | ||
* <p>If the request is new, it invokes a callback to remove itself from the {@code inflightOffsetFetches} | ||
* upon completion.</> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: wrong tag </> and other unclosed ones above
@@ -451,11 +443,11 @@ private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> addOffsetFetch | |||
/** | |||
* Clear {@code unsentOffsetCommits} and moves all the sendable request in {@code unsentOffsetFetches} to the | |||
* {@code inflightOffsetFetches} to bookkeep all of the inflight requests. | |||
* | |||
* <p> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
was the intention here to add an extra line in-between? should be <p/>
if so
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😓 - must be the residual from some edits..
Updates based on review comments
@junrao @lianetm - Thanks for another round of reviews. The latest build passed with the following test failures
I believe they are irrelevant. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@philipnee : Thanks for the latest PR. LGTM
TopicMetadataRequestManager is responsible for sending topic metadata requests. The manager manages API requests and build the request accordingly. All topic metadata requests are chained, if requesting the same topic, to avoid sending requests repeatedly. Co-authored-by: Lianet Magrans <lmagrans@confluent.io> Co-authored-by: Kirk True <kirk@kirktrue.pro> Reviewers: Kirk True <kirk@kirktrue.pro>, Lianet Magrans <lianetmr@gmail.com>, Jun Rao <junrao@gmail.com>
TopicMetadataRequestManager is responsible for sending topic metadata requests. The manager manages API requests and build the request accordingly. All topic metadata requests are chained, if requesting the same topic, to avoid sending requests repeatedly. Co-authored-by: Lianet Magrans <lmagrans@confluent.io> Co-authored-by: Kirk True <kirk@kirktrue.pro> Reviewers: Kirk True <kirk@kirktrue.pro>, Lianet Magrans <lianetmr@gmail.com>, Jun Rao <junrao@gmail.com>
TopicMetadataRequestManager is responsible for sending topic metadata requests. The manager manages API requests and build the request accordingly. All topic metadata requests are chained, if requesting the same topic, to avoid sending requests repeatedly. Co-authored-by: Lianet Magrans <lmagrans@confluent.io> Co-authored-by: Kirk True <kirk@kirktrue.pro> Reviewers: Kirk True <kirk@kirktrue.pro>, Lianet Magrans <lianetmr@gmail.com>, Jun Rao <junrao@gmail.com>
TopicMetadataRequestManager is responsible for sending topic metadata requests. The manager manages API requests and build the request accordingly. All topic metadata requests are chained, if requesting the same topic, to avoid sending requests repeatedly.