Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14960: TopicMetadata request manager #14386

Merged
merged 32 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
33a221e
KAFKA-14960: [Part I]TopicMetadataRequestManager Implementation (#7)
philipnee Jun 8, 2023
a350729
rebase
philipnee Sep 8, 2023
76d5c96
KAFKA-14274, #3: Introduce IdempotentCloser to ensure resources are o…
kirktrue May 24, 2023
d33188b
refactor existing code
philipnee Sep 8, 2023
ceb8694
Fixing uncompleted future
philipnee Sep 9, 2023
3b7400b
KAFKA-14960: [Part I]TopicMetadataRequestManager Implementation (#7)
philipnee Jun 8, 2023
1ac4148
rebase
philipnee Sep 8, 2023
875abb0
KAFKA-14274, #3: Introduce IdempotentCloser to ensure resources are o…
kirktrue May 24, 2023
a8be7ba
refactor existing code
philipnee Sep 8, 2023
227d9b6
Revert "KAFKA-14274, #3: Introduce IdempotentCloser to ensure resourc…
philipnee Sep 11, 2023
ae44539
Clean up sad unused code
philipnee Sep 11, 2023
4b2a7f7
KAFKA-14960: [Part I]TopicMetadataRequestManager Implementation (#7)
philipnee Jun 8, 2023
74b4960
rebase
philipnee Sep 8, 2023
c89958c
KAFKA-14274, #3: Introduce IdempotentCloser to ensure resources are o…
kirktrue May 24, 2023
d4e874a
refactor existing code
philipnee Sep 8, 2023
b86bc87
Fixing uncompleted future
philipnee Sep 9, 2023
d236f1b
KAFKA-14960: [Part I]TopicMetadataRequestManager Implementation (#7)
philipnee Jun 8, 2023
09463d2
rebase
philipnee Sep 8, 2023
6a00f7e
KAFKA-14274, #3: Introduce IdempotentCloser to ensure resources are o…
kirktrue May 24, 2023
381730f
refactor existing code
philipnee Sep 8, 2023
530535d
Revert "KAFKA-14274, #3: Introduce IdempotentCloser to ensure resourc…
philipnee Sep 11, 2023
8a0687c
Clean up sad unused code
philipnee Sep 11, 2023
26d59dd
Update DefaultBackgroundThreadTest.java
philipnee Sep 13, 2023
d1bada5
Merge branch 'cherry-pick-topic-metadata' of github.com:philipnee/kaf…
philipnee Sep 13, 2023
ffc66e9
Address comments on reviews.
philipnee Sep 14, 2023
6c06137
PR comments
philipnee Sep 15, 2023
ec600b3
PR comment - passing responseTime from ClientResponse
philipnee Sep 18, 2023
403e122
addressed more comments
philipnee Sep 18, 2023
269cbcb
Merge branch 'trunk' into cherry-pick-topic-metadata
philipnee Sep 18, 2023
179cd5d
clean up
philipnee Sep 18, 2023
628cb4c
clean up
philipnee Sep 18, 2023
6473597
Update CommitRequestManager.java
philipnee Sep 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
Expand Down Expand Up @@ -129,7 +128,7 @@ public void maybeAutoCommit(final Map<TopicPartition, OffsetAndMetadata> offsets
* Handles {@link org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent}. It creates an
* {@link OffsetCommitRequestState} and enqueue it to send later.
*/
public CompletableFuture<ClientResponse> addOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) {
public CompletableFuture<Void> addOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) {
return pendingRequests.addOffsetCommitRequest(offsets);
}

Expand All @@ -145,19 +144,13 @@ public void updateAutoCommitTimer(final long currentTimeMs) {
this.autoCommitState.ifPresent(t -> t.ack(currentTimeMs));
}


// Visible for testing
List<OffsetFetchRequestState> unsentOffsetFetchRequests() {
return pendingRequests.unsentOffsetFetches;
}

// Visible for testing
Queue<OffsetCommitRequestState> unsentOffsetCommitRequests() {
return pendingRequests.unsentOffsetCommits;
}

// Visible for testing
CompletableFuture<ClientResponse> sendAutoCommit(final Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets) {
CompletableFuture<Void> sendAutoCommit(final Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets) {
log.debug("Enqueuing autocommit offsets: {}", allConsumedOffsets);
return this.addOffsetCommitRequest(allConsumedOffsets)
.whenComplete((response, throwable) -> {
Expand All @@ -182,23 +175,19 @@ private class OffsetCommitRequestState {
private final String groupId;
private final GroupState.Generation generation;
private final String groupInstanceId;
private final NetworkClientDelegate.FutureCompletionHandler future;
private final CompletableFuture<Void> future;

public OffsetCommitRequestState(final Map<TopicPartition, OffsetAndMetadata> offsets,
final String groupId,
final String groupInstanceId,
final GroupState.Generation generation) {
this.offsets = offsets;
this.future = new NetworkClientDelegate.FutureCompletionHandler();
this.future = new CompletableFuture<>();
this.groupId = groupId;
this.generation = generation;
this.groupInstanceId = groupInstanceId;
}

public CompletableFuture<ClientResponse> future() {
return future.future();
}

public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
Map<String, OffsetCommitRequestData.OffsetCommitRequestTopic> requestTopicDataMap = new HashMap<>();
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
Expand Down Expand Up @@ -230,15 +219,20 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
return new NetworkClientDelegate.UnsentRequest(
builder,
coordinatorRequestManager.coordinator(),
future);
(response, throwable) -> {
if (throwable == null) {
future.complete(null);
} else {
future.completeExceptionally(throwable);
}
});
}
}

private class OffsetFetchRequestState extends RequestState {
public final Set<TopicPartition> requestedPartitions;
public final GroupState.Generation requestedGeneration;
public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future;

private final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future;
public OffsetFetchRequestState(final Set<TopicPartition> partitions,
final GroupState.Generation generation,
final long retryBackoffMs,
Expand All @@ -253,19 +247,16 @@ public boolean sameRequest(final OffsetFetchRequestState request) {
return Objects.equals(requestedGeneration, request.requestedGeneration) && requestedPartitions.equals(request.requestedPartitions);
}

public NetworkClientDelegate.UnsentRequest toUnsentRequest(final long currentTimeMs) {
public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
OffsetFetchRequest.Builder builder = new OffsetFetchRequest.Builder(
groupState.groupId,
true,
new ArrayList<>(this.requestedPartitions),
throwOnFetchStableOffsetUnsupported);
NetworkClientDelegate.UnsentRequest unsentRequest = new NetworkClientDelegate.UnsentRequest(
return new NetworkClientDelegate.UnsentRequest(
builder,
coordinatorRequestManager.coordinator());
unsentRequest.future().whenComplete((r, t) -> {
onResponse(currentTimeMs, (OffsetFetchResponse) r.responseBody());
});
return unsentRequest;
coordinatorRequestManager.coordinator(),
(r, t) -> onResponse(r.receivedTimeMs(), (OffsetFetchResponse) r.responseBody()));
}

public void onResponse(
Expand Down Expand Up @@ -359,12 +350,12 @@ private void onSuccess(final long currentTimeMs,
}
}

private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> chainFuture(final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> chainFuture(final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> otherFuture) {
return this.future.whenComplete((r, t) -> {
if (t != null) {
future.completeExceptionally(t);
otherFuture.completeExceptionally(t);
} else {
future.complete(r);
otherFuture.complete(r);
}
});
}
Expand All @@ -384,8 +375,8 @@ public String toString() {
* <p>This is used to stage the unsent {@link OffsetCommitRequestState} and {@link OffsetFetchRequestState}.
* <li>unsentOffsetCommits holds the offset commit requests that have not been sent out</>
* <li>unsentOffsetFetches holds the offset fetch requests that have not been sent out</li>
* <li>inflightOffsetFetches holds the offset fetch requests that have been sent out but incompleted</>.
*
* <li>inflightOffsetFetches holds the offset fetch requests that have been sent out but not completed</>.
* <p>
* {@code addOffsetFetchRequest} dedupes the requests to avoid sending the same requests.
*/

Expand All @@ -395,27 +386,28 @@ class PendingRequests {
List<OffsetFetchRequestState> unsentOffsetFetches = new ArrayList<>();
List<OffsetFetchRequestState> inflightOffsetFetches = new ArrayList<>();

public boolean hasUnsentRequests() {
// Visible for testing
boolean hasUnsentRequests() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be private? Ditto below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry - I left out commit. Fixing those obvious mistakes right away.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry - leaving this protected as it is used in one of the test to verify there's no more unsent request.

return !unsentOffsetCommits.isEmpty() || !unsentOffsetFetches.isEmpty();
}

public CompletableFuture<ClientResponse> addOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) {
CompletableFuture<Void> addOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) {
// TODO: Dedupe committing the same offsets to the same partitions
OffsetCommitRequestState request = new OffsetCommitRequestState(
offsets,
groupState.groupId,
groupState.groupInstanceId.orElse(null),
groupState.generation);
unsentOffsetCommits.add(request);
return request.future();
return request.future;
}

/**
* <p>Adding an offset fetch request to the outgoing buffer. If the same request was made, we chain the future
* to the existing one.
* <p>Adding an offset fetch request to the outgoing buffer. If the same request was made, we chain the future
* to the existing one.
*
* <p>If the request is new, it invokes a callback to remove itself from the {@code inflightOffsetFetches}
* upon completion.</>
* <p>If the request is new, it invokes a callback to remove itself from the {@code inflightOffsetFetches}
* upon completion.
*/
private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> addOffsetFetchRequest(final OffsetFetchRequestState request) {
Optional<OffsetFetchRequestState> dupe =
Expand Down Expand Up @@ -450,12 +442,11 @@ private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> addOffsetFetch

/**
* Clear {@code unsentOffsetCommits} and moves all the sendable request in {@code unsentOffsetFetches} to the
* {@code inflightOffsetFetches} to bookkeep all of the inflight requests.
*
* {@code inflightOffsetFetches} to bookkeep all the inflight requests.
* Note: Sendable requests are determined by their timer as we are expecting backoff on failed attempt. See
* {@link RequestState}.
**/
public List<NetworkClientDelegate.UnsentRequest> drain(final long currentTimeMs) {
List<NetworkClientDelegate.UnsentRequest> drain(final long currentTimeMs) {
List<NetworkClientDelegate.UnsentRequest> unsentRequests = new ArrayList<>();

// Add all unsent offset commit requests to the unsentRequests list
Expand All @@ -472,7 +463,7 @@ public List<NetworkClientDelegate.UnsentRequest> drain(final long currentTimeMs)
// Add all sendable offset fetch requests to the unsentRequests list and to the inflightOffsetFetches list
for (OffsetFetchRequestState request : partitionedBySendability.get(true)) {
request.onSendAttempt(currentTimeMs);
unsentRequests.add(request.toUnsentRequest(currentTimeMs));
unsentRequests.add(request.toUnsentRequest());
inflightOffsetFetches.add(request);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
* Whether there is an existing coordinator.
* Whether there is an inflight request.
* Whether the backoff timer has expired.
* The {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} contains either a wait timer
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just cleaning up long references

* or a singleton list of {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}.
* The {@link NetworkClientDelegate.PollResult} contains either a wait timer
* or a singleton list of {@link NetworkClientDelegate.UnsentRequest}.
* <p/>
* The {@link FindCoordinatorRequest} will be handled by the {@link #onResponse(long, FindCoordinatorResponse)} callback, which
* subsequently invokes {@code onResponse} to handle the exception and response. Note that the coordinator node will be
Expand Down Expand Up @@ -86,7 +86,7 @@ public CoordinatorRequestManager(
* Note that this method does not involve any actual network IO, and it only determines if we need to send a new request or not.
*
* @param currentTimeMs current time in ms.
* @return {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult}. This will not be {@code null}.
* @return {@link NetworkClientDelegate.PollResult}. This will not be {@code null}.
*/
@Override
public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class DefaultBackgroundThread extends KafkaThread {
private final RequestManagers requestManagers;

// Visible for testing
@SuppressWarnings("ParameterNumber")
DefaultBackgroundThread(final Time time,
final ConsumerConfig config,
final LogContext logContext,
Expand All @@ -89,7 +90,8 @@ public class DefaultBackgroundThread extends KafkaThread {
final GroupState groupState,
final CoordinatorRequestManager coordinatorManager,
final CommitRequestManager commitRequestManager,
final OffsetsRequestManager offsetsRequestManager) {
final OffsetsRequestManager offsetsRequestManager,
final TopicMetadataRequestManager topicMetadataRequestManager) {
super(BACKGROUND_THREAD_NAME, true);
this.time = time;
this.running = true;
Expand All @@ -102,9 +104,9 @@ public class DefaultBackgroundThread extends KafkaThread {
this.networkClientDelegate = networkClient;
this.errorEventHandler = errorEventHandler;
this.groupState = groupState;

this.requestManagers = new RequestManagers(
offsetsRequestManager,
topicMetadataRequestManager,
Optional.ofNullable(coordinatorManager),
Optional.ofNullable(commitRequestManager));
}
Expand Down Expand Up @@ -169,6 +171,9 @@ public DefaultBackgroundThread(final Time time,
logContext);
CoordinatorRequestManager coordinatorRequestManager = null;
CommitRequestManager commitRequestManager = null;
TopicMetadataRequestManager topicMetadataRequestManger = new TopicMetadataRequestManager(
logContext,
config);

if (groupState.groupId != null) {
coordinatorRequestManager = new CoordinatorRequestManager(
Expand All @@ -188,15 +193,14 @@ public DefaultBackgroundThread(final Time time,
}

this.requestManagers = new RequestManagers(
offsetsRequestManager,
Optional.ofNullable(coordinatorRequestManager),
Optional.ofNullable(commitRequestManager));

offsetsRequestManager,
topicMetadataRequestManger,
Optional.ofNullable(coordinatorRequestManager),
Optional.ofNullable(commitRequestManager));
this.applicationEventProcessor = new ApplicationEventProcessor(
backgroundEventQueue,
requestManagers,
metadata);

backgroundEventQueue,
requestManagers,
metadata);
} catch (final Exception e) {
close();
throw new KafkaException("Failed to construct background processor", e.getCause());
Expand All @@ -217,7 +221,7 @@ public void run() {
}
} catch (final Throwable t) {
log.error("The background thread failed due to unexpected error", t);
throw new RuntimeException(t);
throw new KafkaException(t);
} finally {
close();
log.debug("{} closed", getClass());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;

/**
* A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to handle network poll and send operations.
Expand Down Expand Up @@ -211,17 +212,19 @@ public static class UnsentRequest {
private Optional<Node> node; // empty if random node can be chosen
private Timer timer;

public UnsentRequest(final AbstractRequest.Builder<?> requestBuilder, final Optional<Node> node) {
this(requestBuilder, node, new FutureCompletionHandler());
}

public UnsentRequest(final AbstractRequest.Builder<?> requestBuilder,
final Optional<Node> node,
final FutureCompletionHandler handler) {
final Optional<Node> node) {
Objects.requireNonNull(requestBuilder);
this.requestBuilder = requestBuilder;
this.node = node;
this.handler = handler;
this.handler = new FutureCompletionHandler();
}

public UnsentRequest(final AbstractRequest.Builder<?> requestBuilder,
final Optional<Node> node,
final BiConsumer<ClientResponse, Throwable> callback) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This constructor seems unused?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is according to my IDE. I think it is used in CommitRequestManager (line 219 and 256) and TopicMetadataRequestManger (line 145)

this(requestBuilder, node);
this.handler.future.whenComplete(callback);
}

public void setTimer(final Time time, final long requestTimeoutMs) {
Expand Down Expand Up @@ -263,10 +266,6 @@ public void onFailure(final RuntimeException e) {
future.completeExceptionally(e);
}

public CompletableFuture<ClientResponse> future() {
return future;
}

@Override
public void onComplete(final ClientResponse response) {
if (response.authenticationException() != null) {
Expand All @@ -280,5 +279,4 @@ public void onComplete(final ClientResponse response) {
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public OffsetsRequestManager(final SubscriptionState subscriptionState,

/**
* Determine if there are pending fetch offsets requests to be sent and build a
* {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult}
* {@link NetworkClientDelegate.PollResult}
* containing it.
*/
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,6 @@ public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCo
commitCallback.onComplete(offsets, null);
}
}).exceptionally(e -> {
System.out.println(e);
throw new KafkaException(e);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,24 @@ public class RequestManagers {
public final Optional<CoordinatorRequestManager> coordinatorRequestManager;
public final Optional<CommitRequestManager> commitRequestManager;
public final OffsetsRequestManager offsetsRequestManager;
public final TopicMetadataRequestManager topicMetadataRequestManager;
private final List<Optional<? extends RequestManager>> entries;

public RequestManagers(OffsetsRequestManager offsetsRequestManager,
TopicMetadataRequestManager topicMetadataRequestManager,
Optional<CoordinatorRequestManager> coordinatorRequestManager,
Optional<CommitRequestManager> commitRequestManager) {
this.offsetsRequestManager = requireNonNull(offsetsRequestManager,
"OffsetsRequestManager cannot be null");
this.coordinatorRequestManager = coordinatorRequestManager;
this.commitRequestManager = commitRequestManager;
this.topicMetadataRequestManager = topicMetadataRequestManager;

List<Optional<? extends RequestManager>> list = new ArrayList<>();
list.add(coordinatorRequestManager);
list.add(commitRequestManager);
list.add(Optional.of(offsetsRequestManager));
list.add(Optional.of(topicMetadataRequestManager));
entries = Collections.unmodifiableList(list);
}

Expand Down
Loading