Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15305: The background thread should try to process the remaining task until the shutdown timer is expired. #16156

Merged
merged 7 commits into from
Jun 4, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -293,12 +293,13 @@ private void closeInternal(final Duration timeout) {
* Check the unsent queue one last time and poll until all requests are sent or the timer runs out.
*/
private void sendUnsentRequests(final Timer timer) {
if (networkClientDelegate.unsentRequests().isEmpty())
if (!networkClientDelegate.hasAnyPendingRequests())
return;

do {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the early return?
If there is a request in in-flight but unSentRequestQueue is empty, we will lose the newest request.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, I think we don't need this early return statement in the current situation because we also need to consider if there are any in-flight requests.
The reason why we have this statement here is because we didn't consider the in-flight requests before, so we could return if there are no unsent requests. 🤔

networkClientDelegate.poll(timer.remainingMs(), timer.currentTimeMs());
timer.update();
} while (timer.notExpired() && !networkClientDelegate.unsentRequests().isEmpty());
} while (timer.notExpired() && networkClientDelegate.hasAnyPendingRequests());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use while-loop instead of do-while, since it is unnecessary to poll for nothing.

}

void cleanup() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,13 @@ public void poll(final long timeoutMs, final long currentTimeMs) {
checkDisconnects(currentTimeMs);
}

/**
* Return true if there is at least one in-flight request or unsent request.
*/
public boolean hasAnyPendingRequests() {
return client.hasInFlightRequests() || !unsentRequests.isEmpty();
}

/**
* Tries to send the requests in the unsentRequest queue. If the request doesn't have an assigned node, it will
* find the leastLoadedOne, and will be retried in the next {@code poll()}. If the request is expired, a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,27 @@ void testRunOnceInvokesReaper() {
verify(applicationEventReaper).reap(any(Long.class));
}

@Test
void testSendUnsentRequest() {
String groupId = "group-id";
NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest(
new FindCoordinatorRequest.Builder(
new FindCoordinatorRequestData()
.setKeyType(FindCoordinatorRequest.CoordinatorType.TRANSACTION.id())
.setKey(groupId)),
Optional.empty());

networkClient.add(request);
assertTrue(networkClient.hasAnyPendingRequests());
assertFalse(networkClient.unsentRequests().isEmpty());
assertFalse(client.hasInFlightRequests());
consumerNetworkThread.cleanup();

assertTrue(networkClient.unsentRequests().isEmpty());
assertFalse(client.hasInFlightRequests());
assertFalse(networkClient.hasAnyPendingRequests());
}

private void prepareOffsetCommitRequest(final Map<TopicPartition, Long> expectedOffsets,
final Errors error,
final boolean disconnected) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import static org.apache.kafka.clients.consumer.ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -140,6 +141,34 @@ public void testEnsureTimerSetOnAdd() {
assertEquals(REQUEST_TIMEOUT_MS, ncd.unsentRequests().poll().timer().timeoutMs());
}

@Test
public void testHasAnyPendingRequests() throws Exception {
try (NetworkClientDelegate networkClientDelegate = newNetworkClientDelegate()) {
NetworkClientDelegate.UnsentRequest unsentRequest = newUnsentFindCoordinatorRequest();
networkClientDelegate.add(unsentRequest);

// unsent
assertTrue(networkClientDelegate.hasAnyPendingRequests());
assertFalse(networkClientDelegate.unsentRequests().isEmpty());
assertFalse(client.hasInFlightRequests());

networkClientDelegate.poll(0, time.milliseconds());

// in-flight
assertTrue(networkClientDelegate.hasAnyPendingRequests());
assertTrue(networkClientDelegate.unsentRequests().isEmpty());
assertTrue(client.hasInFlightRequests());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to cover the whole flow, what about extending the test to ensure that when we get a response, the hasAnyPendingRequests goes false? (there's a prepareFindCoordinatorResponse that should be handy for it)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense, I will take a look 😺


client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, GROUP_ID, mockNode()));
networkClientDelegate.poll(0, time.milliseconds());

// get response
assertFalse(networkClientDelegate.hasAnyPendingRequests());
assertTrue(networkClientDelegate.unsentRequests().isEmpty());
assertFalse(client.hasInFlightRequests());
}
}

public NetworkClientDelegate newNetworkClientDelegate() {
LogContext logContext = new LogContext();
Properties properties = new Properties();
Expand Down