Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14768_KIP913: add new method to provide possibility for warming up first record's sending and reducing the max.block.ms safely. #13320

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -1245,9 +1245,22 @@ public void flush() {
*/
@Override
public List<PartitionInfo> partitionsFor(String topic) {
return getCluster(topic, this.maxBlockTimeMs).partitionsForTopic(topic);
}

/**
* Get the cluster info for the given topic. This can also be used for producer's warmup to accelerate first record's sending.
* @throws AuthenticationException if authentication fails. See the exception for more details
* @throws AuthorizationException if not authorized to the specified topic. See the exception for more details
* @throws InterruptException if the thread is interrupted while blocked
* @throws TimeoutException if metadata could not be refreshed within {@code maxBlockTimeMs}
* @throws KafkaException for all Kafka-related exceptions, including the case where this method is called after producer close
*/
@Override
public Cluster getCluster(String topic, long maxBlockTimeMs) {
Objects.requireNonNull(topic, "topic cannot be null");
try {
return waitOnMetadata(topic, null, time.milliseconds(), maxBlockTimeMs).cluster.partitionsForTopic(topic);
return waitOnMetadata(topic, null, time.milliseconds(), maxBlockTimeMs).cluster;
} catch (InterruptedException e) {
throw new InterruptException(e);
}
Expand Down
Expand Up @@ -379,6 +379,11 @@ public List<PartitionInfo> partitionsFor(String topic) {
return this.cluster.partitionsForTopic(topic);
}

@Override
public Cluster getCluster(String topic, long maxBlockTimeMs) {
return this.cluster;
}

public Map<MetricName, Metric> metrics() {
return mockMetrics;
}
Expand Down
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
Expand Down Expand Up @@ -90,6 +91,11 @@ void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
*/
List<PartitionInfo> partitionsFor(String topic);

/**
* See {@link KafkaProducer#getCluster(String, long)}
*/
Cluster getCluster(String topic, long maxBlockTimeMs);

/**
* See {@link KafkaProducer#metrics()}
*/
Expand Down