Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ public FetchSessionHandler(LogContext logContext, int node) {
this.node = node;
}

// visible for testing
public int sessionId() {
return nextMetadata.sessionId();
}

/**
* All of the partitions which exist in the fetch request session.
*/
Expand Down Expand Up @@ -525,7 +530,7 @@ public boolean handleResponse(FetchResponse response, short version) {
if (response.error() == Errors.FETCH_SESSION_ID_NOT_FOUND) {
nextMetadata = FetchMetadata.INITIAL;
} else {
nextMetadata = nextMetadata.nextCloseExisting();
nextMetadata = nextMetadata.nextCloseExistingAttemptNew();
}
return false;
}
Expand Down Expand Up @@ -567,7 +572,7 @@ public boolean handleResponse(FetchResponse response, short version) {
String problem = verifyIncrementalFetchResponsePartitions(topicPartitions, response.topicIds(), version);
if (problem != null) {
log.info("Node {} sent an invalid incremental fetch response with {}", node, problem);
nextMetadata = nextMetadata.nextCloseExisting();
nextMetadata = nextMetadata.nextCloseExistingAttemptNew();
return false;
} else if (response.sessionId() == INVALID_SESSION_ID) {
// The incremental fetch session was closed by the server.
Expand All @@ -590,6 +595,14 @@ public boolean handleResponse(FetchResponse response, short version) {
}
}

/**
* The client will initiate the session close on next fetch request.
*/
public void notifyClose() {
log.debug("Set the metadata for next fetch request to close the existing session ID={}", nextMetadata.sessionId());
nextMetadata = nextMetadata.nextCloseExisting();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would happen if the session handler is reused after this is called? Should we add unit tests in FetchSessionHandlerTest to be complete?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SessionHandler should not be reused here after close because:

  1. We drain all completed fetches before calling close of sessions. Hence, no completed fetches will use session.
  2. Fetcher is only called from the Consumer. Consumer has a single threaded access i.e. while it is processing the close, we don't expect it to poll or call Fetcher.sendFetches, session handler will not be used.
  3. SessionHandler map will be cleared after the close request is sent in the Fetcher.close()
  4. We have ensured that no other thread (e.g. FetchResponse future) can use Fetcher while it is being closed by acquiring a lock on Fetcher (at synchronized (Fetcher.this)) before close starts. This ensures that sessionHandler is not called by anyone before close is complete (which should clear the sessionHandler map).

Is my understanding correct here?

Regarding the test, what kind validation/assertion would you like to see from it? I can't think of a test that might be useful for us here.

}

/**
* Handle an error sending the prepared request.
*
Expand All @@ -600,7 +613,7 @@ public boolean handleResponse(FetchResponse response, short version) {
*/
public void handleError(Throwable t) {
log.info("Error sending fetch request {} to node {}:", nextMetadata, node, t);
nextMetadata = nextMetadata.nextCloseExisting();
nextMetadata = nextMetadata.nextCloseExistingAttemptNew();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.event.Level;

import java.net.InetSocketAddress;
import java.time.Duration;
Expand Down Expand Up @@ -824,7 +825,7 @@ public KafkaConsumer(Map<String, Object> configs,
// call close methods if internal objects are already constructed; this is to prevent resource leak. see KAFKA-2121
// we do not need to call `close` at all when `log` is null, which means no internal objects were initialized.
if (this.log != null) {
close(0, true);
close(Duration.ZERO, true);
}
// now propagate the exception
throw new KafkaException("Failed to construct kafka consumer", t);
Expand Down Expand Up @@ -2397,7 +2398,7 @@ public void close(Duration timeout) {
if (!closed) {
// need to close before setting the flag since the close function
// itself may trigger rebalance callback that needs the consumer to be open still
close(timeout.toMillis(), false);
close(timeout, false);
}
} finally {
closed = true;
Expand Down Expand Up @@ -2425,17 +2426,38 @@ private ClusterResourceListeners configureClusterResourceListeners(Deserializer<
return clusterResourceListeners;
}

private void close(long timeoutMs, boolean swallowException) {
private Timer createTimerForRequest(final Duration timeout) {
// this.time could be null if an exception occurs in constructor prior to setting the this.time field
final Time localTime = (time == null) ? Time.SYSTEM : time;
return localTime.timer(Math.min(timeout.toMillis(), requestTimeoutMs));
}

private void close(Duration timeout, boolean swallowException) {
log.trace("Closing the Kafka consumer");
AtomicReference<Throwable> firstException = new AtomicReference<>();
try {
if (coordinator != null)
coordinator.close(time.timer(Math.min(timeoutMs, requestTimeoutMs)));
} catch (Throwable t) {
firstException.compareAndSet(null, t);
log.error("Failed to close coordinator", t);

final Timer closeTimer = createTimerForRequest(timeout);
// Close objects with a timeout. The timeout is required because the coordinator & the fetcher send requests to
// the server in the process of closing which may not respect the overall timeout defined for closing the
// consumer.
if (coordinator != null) {
// This is a blocking call bound by the time remaining in closeTimer
Utils.swallow(log, Level.ERROR, "Failed to close coordinator with a timeout(ms)=" + closeTimer.timeoutMs(), () -> coordinator.close(closeTimer), firstException);
}
Utils.closeQuietly(fetcher, "fetcher", firstException);

if (fetcher != null) {
// the timeout for the session close is at-most the requestTimeoutMs
long remainingDurationInTimeout = Math.max(0, timeout.toMillis() - closeTimer.elapsedMs());
if (remainingDurationInTimeout > 0) {
remainingDurationInTimeout = Math.min(requestTimeoutMs, remainingDurationInTimeout);
}

closeTimer.reset(remainingDurationInTimeout);

// This is a blocking call bound by the time remaining in closeTimer
Utils.swallow(log, Level.ERROR, "Failed to close fetcher with a timeout(ms)=" + closeTimer.timeoutMs(), () -> fetcher.close(closeTimer), firstException);
}

Utils.closeQuietly(interceptors, "consumer interceptors", firstException);
Utils.closeQuietly(kafkaConsumerMetrics, "kafka consumer metrics", firstException);
Utils.closeQuietly(metrics, "consumer metrics", firstException);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
Expand Down Expand Up @@ -159,7 +160,7 @@ public class Fetcher<K, V> implements Closeable {
private final Set<Integer> nodesWithPendingFetchRequests;
private final ApiVersions apiVersions;
private final AtomicInteger metadataUpdateVersion = new AtomicInteger(-1);

private final AtomicBoolean isClosed = new AtomicBoolean(false);
private CompletedFetch nextInLineFetch = null;

public Fetcher(LogContext logContext,
Expand Down Expand Up @@ -253,25 +254,7 @@ public synchronized int sendFetches() {
for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : fetchRequestMap.entrySet()) {
final Node fetchTarget = entry.getKey();
final FetchSessionHandler.FetchRequestData data = entry.getValue();
final short maxVersion;
if (!data.canUseTopicIds()) {
maxVersion = (short) 12;
} else {
maxVersion = ApiKeys.FETCH.latestVersion();
}
final FetchRequest.Builder request = FetchRequest.Builder
.forConsumer(maxVersion, this.maxWaitMs, this.minBytes, data.toSend())
.isolationLevel(isolationLevel)
.setMaxBytes(this.maxBytes)
.metadata(data.metadata())
.removed(data.toForget())
.replaced(data.toReplace())
.rackId(clientRackId);

if (log.isDebugEnabled()) {
log.debug("Sending {} {} to broker {}", isolationLevel, data.toString(), fetchTarget);
}
RequestFuture<ClientResponse> future = client.send(fetchTarget, request);
final RequestFuture<ClientResponse> future = sendFetchRequestToNode(data, fetchTarget);
// We add the node to the set of nodes with pending fetch requests before adding the
// listener because the future may have been fulfilled on another thread (e.g. during a
// disconnection being handled by the heartbeat thread) which will mean the listener
Expand Down Expand Up @@ -447,6 +430,33 @@ private RequestFuture<ClientResponse> sendMetadataRequest(MetadataRequest.Builde
return client.send(node, request);
}

/**
* Send Fetch Request to Kafka cluster asynchronously.
*
* This method is visible for testing.
*
* @return A future that indicates result of sent Fetch request
*/
private RequestFuture<ClientResponse> sendFetchRequestToNode(final FetchSessionHandler.FetchRequestData requestData,
final Node fetchTarget) {
// Version 12 is the maximum version that could be used without topic IDs. See FetchRequest.json for schema
// changelog.
final short maxVersion = requestData.canUseTopicIds() ? ApiKeys.FETCH.latestVersion() : (short) 12;

final FetchRequest.Builder request = FetchRequest.Builder
.forConsumer(maxVersion, this.maxWaitMs, this.minBytes, requestData.toSend())
.isolationLevel(isolationLevel)
.setMaxBytes(this.maxBytes)
.metadata(requestData.metadata())
.removed(requestData.toForget())
.replaced(requestData.toReplace())
.rackId(clientRackId);

log.debug("Sending {} {} to broker {}", isolationLevel, requestData, fetchTarget);

return client.send(fetchTarget, request);
}

private Long offsetResetStrategyTimestamp(final TopicPartition partition) {
OffsetResetStrategy strategy = subscriptions.resetStrategy(partition);
if (strategy == OffsetResetStrategy.EARLIEST)
Expand Down Expand Up @@ -1936,11 +1946,77 @@ private Map<String, String> topicPartitionTags(TopicPartition tp) {
}
}

// Visible for testing
void maybeCloseFetchSessions(final Timer timer) {
final Cluster cluster = metadata.fetch();
final List<RequestFuture<ClientResponse>> requestFutures = new ArrayList<>();
sessionHandlers.forEach((fetchTargetNodeId, sessionHandler) -> {
// set the session handler to notify close. This will set the next metadata request to send close message.
sessionHandler.notifyClose();

final int sessionId = sessionHandler.sessionId();
// FetchTargetNode may not be available as it may have disconnected the connection. In such cases, we will
// skip sending the close request.
final Node fetchTarget = cluster.nodeById(fetchTargetNodeId);
if (fetchTarget == null || client.isUnavailable(fetchTarget)) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI reviewer

It is possible that the node is not part of the cluster any more or the connection to that node has been disconnected. In such scenarios, we don't want to try sending a final fetch request to the server. Note that the node is not necessarily the coordinator and could be another broker (such as read replica). The process of choosing a node to establish a fetch session is determined at https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1197-L1225

log.debug("Skip sending close session request to broker {} since it is not reachable", fetchTarget);
return;
}

final RequestFuture<ClientResponse> responseFuture = sendFetchRequestToNode(sessionHandler.newBuilder().build(), fetchTarget);
responseFuture.addListener(new RequestFutureListener<ClientResponse>() {
@Override
public void onSuccess(ClientResponse value) {
log.debug("Successfully sent a close message for fetch session: {} to node: {}", sessionId, fetchTarget);
}

@Override
public void onFailure(RuntimeException e) {
log.debug("Unable to a close message for fetch session: {} to node: {}. " +
"This may result in unnecessary fetch sessions at the broker.", sessionId, fetchTarget, e);
}
});

requestFutures.add(responseFuture);
});

// Poll to ensure that request has been written to the socket. Wait until either the timer has expired or until
// all requests have received a response.
while (timer.notExpired() && !requestFutures.stream().allMatch(RequestFuture::isDone)) {
client.poll(timer, null, true);
}

if (!requestFutures.stream().allMatch(RequestFuture::isDone)) {
// we ran out of time before completing all futures. It is ok since we don't want to block the shutdown
// here.
log.debug("All requests couldn't be sent in the specific timeout period {}ms. " +
"This may result in unnecessary fetch sessions at the broker. Consider increasing the timeout passed for " +
"KafkaConsumer.close(Duration timeout)", timer.timeoutMs());
}
}

public void close(final Timer timer) {
if (!isClosed.compareAndSet(false, true)) {
log.info("Fetcher {} is already closed.", this);
return;
}

// Shared states (e.g. sessionHandlers) could be accessed by multiple threads (such as heartbeat thread), hence,
// it is necessary to acquire a lock on the fetcher instance before modifying the states.
synchronized (Fetcher.this) {
// we do not need to re-enable wakeups since we are closing already
client.disableWakeups();
if (nextInLineFetch != null)
nextInLineFetch.drain();
maybeCloseFetchSessions(timer);
Utils.closeQuietly(decompressionBufferSupplier, "decompressionBufferSupplier");
sessionHandlers.clear();
}
}

@Override
public void close() {
if (nextInLineFetch != null)
nextInLineFetch.drain();
decompressionBufferSupplier.close();
close(time.timer(0));
}

private Set<String> topicsForPartitions(Collection<TopicPartition> partitions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,18 @@ public boolean equals(Object o) {
}

/**
* Return the metadata for the next error response.
* Return the metadata for the next request. The metadata is set to indicate that the client wants to close the
* existing session.
*/
public FetchMetadata nextCloseExisting() {
return new FetchMetadata(sessionId, FINAL_EPOCH);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI reviewer

Server handles this message at FetchSession.scala at https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/FetchSession.scala#L785

}

/**
* Return the metadata for the next request. The metadata is set to indicate that the client wants to close the
* existing session and create a new one if possible.
*/
public FetchMetadata nextCloseExistingAttemptNew() {
return new FetchMetadata(sessionId, INITIAL_EPOCH);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,11 @@ public Builder isolationLevel(IsolationLevel isolationLevel) {
return this;
}

// Visible for testing
public FetchMetadata metadata() {
return this.metadata;
}

public Builder metadata(FetchMetadata metadata) {
this.metadata = metadata;
return this;
Expand Down
42 changes: 33 additions & 9 deletions clients/src/main/java/org/apache/kafka/common/utils/Utils.java
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.kafka.common.network.TransferableChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

import java.io.Closeable;
import java.io.DataOutput;
Expand Down Expand Up @@ -997,16 +998,39 @@ public static void closeAll(Closeable... closeables) throws IOException {
if (exception != null)
throw exception;
}
public static void swallow(final Logger log, final Level level, final String what, final Runnable code) {
swallow(log, level, what, code, null);
}

public static void swallow(
Logger log,
String what,
Runnable runnable
) {
try {
runnable.run();
} catch (Throwable e) {
log.warn("{} error", what, e);
/**
* Run the supplied code. If an exception is thrown, it is swallowed and registered to the firstException parameter.
*/
public static void swallow(final Logger log, final Level level, final String what, final Runnable code,
final AtomicReference<Throwable> firstException) {
if (code != null) {
try {
code.run();
} catch (Throwable t) {
switch (level) {
case INFO:
log.info(what, t);
break;
case DEBUG:
log.debug(what, t);
break;
case ERROR:
log.error(what, t);
break;
case TRACE:
log.trace(what, t);
break;
case WARN:
default:
log.warn(what, t);
}
if (firstException != null)
firstException.compareAndSet(null, t);
}
}
}

Expand Down
Loading