Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14960: TopicMetadata request manager #14386

Merged
merged 32 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
33a221e
KAFKA-14960: [Part I]TopicMetadataRequestManager Implementation (#7)
philipnee Jun 8, 2023
a350729
rebase
philipnee Sep 8, 2023
76d5c96
KAFKA-14274, #3: Introduce IdempotentCloser to ensure resources are o…
kirktrue May 24, 2023
d33188b
refactor existing code
philipnee Sep 8, 2023
ceb8694
Fixing uncompleted future
philipnee Sep 9, 2023
3b7400b
KAFKA-14960: [Part I]TopicMetadataRequestManager Implementation (#7)
philipnee Jun 8, 2023
1ac4148
rebase
philipnee Sep 8, 2023
875abb0
KAFKA-14274, #3: Introduce IdempotentCloser to ensure resources are o…
kirktrue May 24, 2023
a8be7ba
refactor existing code
philipnee Sep 8, 2023
227d9b6
Revert "KAFKA-14274, #3: Introduce IdempotentCloser to ensure resourc…
philipnee Sep 11, 2023
ae44539
Clean up sad unused code
philipnee Sep 11, 2023
4b2a7f7
KAFKA-14960: [Part I]TopicMetadataRequestManager Implementation (#7)
philipnee Jun 8, 2023
74b4960
rebase
philipnee Sep 8, 2023
c89958c
KAFKA-14274, #3: Introduce IdempotentCloser to ensure resources are o…
kirktrue May 24, 2023
d4e874a
refactor existing code
philipnee Sep 8, 2023
b86bc87
Fixing uncompleted future
philipnee Sep 9, 2023
d236f1b
KAFKA-14960: [Part I]TopicMetadataRequestManager Implementation (#7)
philipnee Jun 8, 2023
09463d2
rebase
philipnee Sep 8, 2023
6a00f7e
KAFKA-14274, #3: Introduce IdempotentCloser to ensure resources are o…
kirktrue May 24, 2023
381730f
refactor existing code
philipnee Sep 8, 2023
530535d
Revert "KAFKA-14274, #3: Introduce IdempotentCloser to ensure resourc…
philipnee Sep 11, 2023
8a0687c
Clean up sad unused code
philipnee Sep 11, 2023
26d59dd
Update DefaultBackgroundThreadTest.java
philipnee Sep 13, 2023
d1bada5
Merge branch 'cherry-pick-topic-metadata' of github.com:philipnee/kaf…
philipnee Sep 13, 2023
ffc66e9
Address comments on reviews.
philipnee Sep 14, 2023
6c06137
PR comments
philipnee Sep 15, 2023
ec600b3
PR comment - passing responseTime from ClientResponse
philipnee Sep 18, 2023
403e122
addressed more comments
philipnee Sep 18, 2023
269cbcb
Merge branch 'trunk' into cherry-pick-topic-metadata
philipnee Sep 18, 2023
179cd5d
clean up
philipnee Sep 18, 2023
628cb4c
clean up
philipnee Sep 18, 2023
6473597
Update CommitRequestManager.java
philipnee Sep 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
Expand All @@ -27,7 +26,6 @@
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetFetchRequest;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.utils.LogContext;
Expand Down Expand Up @@ -91,7 +89,7 @@ public CommitRequestManager(
}

/**
* Poll for the {@link OffsetFetchRequest} and {@link OffsetCommitRequest} request if there's any. The function will
* Poll for the {@link OffsetFetchRequest} and {@link org.apache.kafka.common.requests.OffsetCommitRequest} request if there's any. The function will
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to spell out the package name for OffsetCommitRequest? It doesn't seem this is done consistently.

* also try to autocommit the offsets, if feature is enabled.
*/
@Override
Expand All @@ -106,6 +104,7 @@ public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, Collections.emptyList());
}

pendingRequests.inflightOffsetFetches.forEach(System.out::println);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this intended?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤦

return new NetworkClientDelegate.PollResult(Long.MAX_VALUE,
Collections.unmodifiableList(pendingRequests.drain(currentTimeMs)));
}
Expand All @@ -127,9 +126,9 @@ public void maybeAutoCommit(final Map<TopicPartition, OffsetAndMetadata> offsets

/**
* Handles {@link org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent}. It creates an
* {@link OffsetCommitRequestState} and enqueue it to send later.
* {@link OffsetCommitRequest} and enqueue it to send later.
*/
public CompletableFuture<ClientResponse> addOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) {
public CompletableFuture<Void> addOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) {
return pendingRequests.addOffsetCommitRequest(offsets);
}

Expand All @@ -145,19 +144,18 @@ public void updateAutoCommitTimer(final long currentTimeMs) {
this.autoCommitState.ifPresent(t -> t.ack(currentTimeMs));
}


// Visible for testing
List<OffsetFetchRequestState> unsentOffsetFetchRequests() {
return pendingRequests.unsentOffsetFetches;
}

// Visible for testing
Queue<OffsetCommitRequestState> unsentOffsetCommitRequests() {
Queue<OffsetCommitRequest> unsentOffsetCommitRequests() {
return pendingRequests.unsentOffsetCommits;
}

// Visible for testing
CompletableFuture<ClientResponse> sendAutoCommit(final Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets) {
CompletableFuture<Void> sendAutoCommit(final Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets) {
log.debug("Enqueuing autocommit offsets: {}", allConsumedOffsets);
return this.addOffsetCommitRequest(allConsumedOffsets)
.whenComplete((response, throwable) -> {
Expand All @@ -177,28 +175,24 @@ CompletableFuture<ClientResponse> sendAutoCommit(final Map<TopicPartition, Offse
});
}

private class OffsetCommitRequestState {
private class OffsetCommitRequest {
private final Map<TopicPartition, OffsetAndMetadata> offsets;
private final String groupId;
private final GroupState.Generation generation;
private final String groupInstanceId;
private final NetworkClientDelegate.FutureCompletionHandler future;
private final CompletableFuture<Void> future;

public OffsetCommitRequestState(final Map<TopicPartition, OffsetAndMetadata> offsets,
final String groupId,
final String groupInstanceId,
final GroupState.Generation generation) {
public OffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets,
final String groupId,
final String groupInstanceId,
final GroupState.Generation generation) {
this.offsets = offsets;
this.future = new NetworkClientDelegate.FutureCompletionHandler();
this.future = new CompletableFuture<>();
this.groupId = groupId;
this.generation = generation;
this.groupInstanceId = groupInstanceId;
}

public CompletableFuture<ClientResponse> future() {
return future.future();
}

public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
Map<String, OffsetCommitRequestData.OffsetCommitRequestTopic> requestTopicDataMap = new HashMap<>();
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
Expand All @@ -220,7 +214,7 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
requestTopicDataMap.put(topicPartition.topic(), topic);
}

OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(
org.apache.kafka.common.requests.OffsetCommitRequest.Builder builder = new org.apache.kafka.common.requests.OffsetCommitRequest.Builder(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we import the package?

new OffsetCommitRequestData()
.setGroupId(this.groupId)
.setGenerationIdOrMemberEpoch(generation.generationId)
Expand All @@ -230,15 +224,20 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
return new NetworkClientDelegate.UnsentRequest(
builder,
coordinatorRequestManager.coordinator(),
future);
(response, throwable) -> {
if (throwable == null) {
this.future.complete(null);
} else {
this.future.completeExceptionally(throwable);
}
});
}
}

private class OffsetFetchRequestState extends RequestState {
public final Set<TopicPartition> requestedPartitions;
public final GroupState.Generation requestedGeneration;
public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future;

CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be private?

public OffsetFetchRequestState(final Set<TopicPartition> partitions,
final GroupState.Generation generation,
final long retryBackoffMs,
Expand All @@ -259,13 +258,10 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest(final long currentTim
true,
new ArrayList<>(this.requestedPartitions),
throwOnFetchStableOffsetUnsupported);
NetworkClientDelegate.UnsentRequest unsentRequest = new NetworkClientDelegate.UnsentRequest(
return new NetworkClientDelegate.UnsentRequest(
builder,
coordinatorRequestManager.coordinator());
unsentRequest.future().whenComplete((r, t) -> {
onResponse(currentTimeMs, (OffsetFetchResponse) r.responseBody());
});
return unsentRequest;
coordinatorRequestManager.coordinator(),
(r, t) -> onResponse(currentTimeMs, (OffsetFetchResponse) r.responseBody()));
}

public void onResponse(
Expand Down Expand Up @@ -298,8 +294,8 @@ private void onFailure(final long currentTimeMs,
}

private void retry(final long currentTimeMs) {
onFailedAttempt(currentTimeMs);
onSendAttempt(currentTimeMs);
this.onFailedAttempt(currentTimeMs);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currentTimeMs is when the request is sent, not when the response is receive. Could we pass along ClientResponse.receivedTimeMs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed, see line #259

this.onSendAttempt(currentTimeMs);
pendingRequests.addOffsetFetchRequest(this);
}

Expand Down Expand Up @@ -359,12 +355,12 @@ private void onSuccess(final long currentTimeMs,
}
}

private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> chainFuture(final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> chainFuture(final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> otherFuture) {
return this.future.whenComplete((r, t) -> {
if (t != null) {
future.completeExceptionally(t);
otherFuture.completeExceptionally(t);
} else {
future.complete(r);
otherFuture.complete(r);
}
});
}
Expand All @@ -381,41 +377,41 @@ public String toString() {
}

/**
* <p>This is used to stage the unsent {@link OffsetCommitRequestState} and {@link OffsetFetchRequestState}.
* <p>This is used to stage the unsent {@link OffsetCommitRequest} and {@link OffsetFetchRequestState}.
* <li>unsentOffsetCommits holds the offset commit requests that have not been sent out</>
* <li>unsentOffsetFetches holds the offset fetch requests that have not been sent out</li>
* <li>inflightOffsetFetches holds the offset fetch requests that have been sent out but incompleted</>.
*
* <li>inflightOffsetFetches holds the offset fetch requests that have been sent out but not completed</>.
* <p>
* {@code addOffsetFetchRequest} dedupes the requests to avoid sending the same requests.
*/

class PendingRequests {
// Queue is used to ensure the sequence of commit
Queue<OffsetCommitRequestState> unsentOffsetCommits = new LinkedList<>();
Queue<OffsetCommitRequest> unsentOffsetCommits = new LinkedList<>();
List<OffsetFetchRequestState> unsentOffsetFetches = new ArrayList<>();
List<OffsetFetchRequestState> inflightOffsetFetches = new ArrayList<>();

public boolean hasUnsentRequests() {
boolean hasUnsentRequests() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be private? Ditto below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry - I left out commit. Fixing those obvious mistakes right away.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry - leaving this protected as it is used in one of the test to verify there's no more unsent request.

return !unsentOffsetCommits.isEmpty() || !unsentOffsetFetches.isEmpty();
}

public CompletableFuture<ClientResponse> addOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) {
CompletableFuture<Void> addOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) {
// TODO: Dedupe committing the same offsets to the same partitions
OffsetCommitRequestState request = new OffsetCommitRequestState(
OffsetCommitRequest request = new OffsetCommitRequest(
offsets,
groupState.groupId,
groupState.groupInstanceId.orElse(null),
groupState.generation);
unsentOffsetCommits.add(request);
return request.future();
return request.future;
}

/**
* <p>Adding an offset fetch request to the outgoing buffer. If the same request was made, we chain the future
* to the existing one.
* <p>Adding an offset fetch request to the outgoing buffer. If the same request was made, we chain the future
* to the existing one.
*
* <p>If the request is new, it invokes a callback to remove itself from the {@code inflightOffsetFetches}
* upon completion.</>
* <p>If the request is new, it invokes a callback to remove itself from the {@code inflightOffsetFetches}
* upon completion.</>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: wrong tag </> and other unclosed ones above

*/
private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> addOffsetFetchRequest(final OffsetFetchRequestState request) {
Optional<OffsetFetchRequestState> dupe =
Expand Down Expand Up @@ -451,17 +447,17 @@ private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> addOffsetFetch
/**
* Clear {@code unsentOffsetCommits} and moves all the sendable request in {@code unsentOffsetFetches} to the
* {@code inflightOffsetFetches} to bookkeep all of the inflight requests.
*
* <p>
Copy link
Collaborator

@lianetm lianetm Sep 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was the intention here to add an extra line in-between? should be <p/> if so

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😓 - must be the residual from some edits..

* Note: Sendable requests are determined by their timer as we are expecting backoff on failed attempt. See
* {@link RequestState}.
**/
public List<NetworkClientDelegate.UnsentRequest> drain(final long currentTimeMs) {
List<NetworkClientDelegate.UnsentRequest> drain(final long currentTimeMs) {
List<NetworkClientDelegate.UnsentRequest> unsentRequests = new ArrayList<>();

// Add all unsent offset commit requests to the unsentRequests list
unsentRequests.addAll(
unsentOffsetCommits.stream()
.map(OffsetCommitRequestState::toUnsentRequest)
.map(OffsetCommitRequest::toUnsentRequest)
.collect(Collectors.toList()));

// Partition the unsent offset fetch requests into sendable and non-sendable lists
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class DefaultBackgroundThread extends KafkaThread {
private final RequestManagers requestManagers;

// Visible for testing
@SuppressWarnings("ParameterNumber")
DefaultBackgroundThread(final Time time,
final ConsumerConfig config,
final LogContext logContext,
Expand All @@ -89,7 +90,8 @@ public class DefaultBackgroundThread extends KafkaThread {
final GroupState groupState,
final CoordinatorRequestManager coordinatorManager,
final CommitRequestManager commitRequestManager,
final OffsetsRequestManager offsetsRequestManager) {
final OffsetsRequestManager offsetsRequestManager,
final TopicMetadataRequestManager topicMetadataRequestManager) {
super(BACKGROUND_THREAD_NAME, true);
this.time = time;
this.running = true;
Expand All @@ -102,9 +104,9 @@ public class DefaultBackgroundThread extends KafkaThread {
this.networkClientDelegate = networkClient;
this.errorEventHandler = errorEventHandler;
this.groupState = groupState;

this.requestManagers = new RequestManagers(
offsetsRequestManager,
topicMetadataRequestManager,
Optional.ofNullable(coordinatorManager),
Optional.ofNullable(commitRequestManager));
}
Expand Down Expand Up @@ -168,6 +170,9 @@ public DefaultBackgroundThread(final Time time,
logContext);
CoordinatorRequestManager coordinatorRequestManager = null;
CommitRequestManager commitRequestManager = null;
TopicMetadataRequestManager topicMetadataRequestManger = new TopicMetadataRequestManager(
logContext,
config);

if (groupState.groupId != null) {
coordinatorRequestManager = new CoordinatorRequestManager(
Expand All @@ -187,15 +192,14 @@ public DefaultBackgroundThread(final Time time,
}

this.requestManagers = new RequestManagers(
offsetsRequestManager,
Optional.ofNullable(coordinatorRequestManager),
Optional.ofNullable(commitRequestManager));

offsetsRequestManager,
topicMetadataRequestManger,
Optional.ofNullable(coordinatorRequestManager),
Optional.ofNullable(commitRequestManager));
this.applicationEventProcessor = new ApplicationEventProcessor(
backgroundEventQueue,
requestManagers,
metadata);

backgroundEventQueue,
requestManagers,
metadata);
} catch (final Exception e) {
close();
throw new KafkaException("Failed to construct background processor", e.getCause());
Expand All @@ -216,7 +220,7 @@ public void run() {
}
} catch (final Throwable t) {
log.error("The background thread failed due to unexpected error", t);
throw new RuntimeException(t);
throw new KafkaException(t);
} finally {
close();
log.debug("{} closed", getClass());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;

/**
* A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to handle network poll and send operations.
Expand Down Expand Up @@ -203,16 +204,17 @@ public static class UnsentRequest {
private Timer timer;

public UnsentRequest(final AbstractRequest.Builder<?> requestBuilder, final Optional<Node> node) {
this(requestBuilder, node, new FutureCompletionHandler());
Objects.requireNonNull(requestBuilder);
this.requestBuilder = requestBuilder;
this.node = node;
this.handler = new FutureCompletionHandler();
}

public UnsentRequest(final AbstractRequest.Builder<?> requestBuilder,
final Optional<Node> node,
final FutureCompletionHandler handler) {
Objects.requireNonNull(requestBuilder);
this.requestBuilder = requestBuilder;
this.node = node;
this.handler = handler;
final BiConsumer<ClientResponse, Throwable> callback) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This constructor seems unused?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is according to my IDE. I think it is used in CommitRequestManager (line 219 and 256) and TopicMetadataRequestManger (line 145)

this(requestBuilder, node);
this.handler.future.whenComplete(callback);
}

public void setTimer(final Time time, final long requestTimeoutMs) {
Expand Down Expand Up @@ -254,10 +256,6 @@ public void onFailure(final RuntimeException e) {
future.completeExceptionally(e);
}

public CompletableFuture<ClientResponse> future() {
return future;
}

@Override
public void onComplete(final ClientResponse response) {
if (response.authenticationException() != null) {
Expand All @@ -271,5 +269,4 @@ public void onComplete(final ClientResponse response) {
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,6 @@ public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCo
commitCallback.onComplete(offsets, null);
}
}).exceptionally(e -> {
System.out.println(e);
throw new KafkaException(e);
});
}
Expand Down
Loading