From 20311836762f02a29b880b7edee76567b1d60c7c Mon Sep 17 00:00:00 2001 From: yaojuncn Date: Sat, 5 Nov 2016 22:12:22 -0700 Subject: [PATCH 1/2] KAFKA-4385-avoid-unnecessary-meta-request-overhead add a producer config: "metadata.fetch.max.count" default to Integer.MAX_VALUE, then everything is the same as before; when specified, the metadata request will be limited by both "metadata.fetch.timeout.ms" (time limit) and "metadata.fetch.max.count" (request count) when "auto.create.topics.enable=false", "metadata.fetch.max.count" can be set as 1, so that it could avoid lots of unncessary duplicate metadata --- .../kafka/clients/producer/KafkaProducer.java | 32 +++++++++++++++---- .../clients/producer/ProducerConfig.java | 10 ++++++ .../errors/MetadataNotAvailableException.java | 21 ++++++++++++ 3 files changed, 56 insertions(+), 7 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/common/errors/MetadataNotAvailableException.java diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 489c762fcf9a4..a1265b01f1641 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -30,6 +30,7 @@ import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.MetadataNotAvailableException; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.metrics.JmxReporter; @@ -149,6 +150,7 @@ public class KafkaProducer implements Producer { private final ProducerConfig producerConfig; private final long maxBlockTimeMs; private final int requestTimeoutMs; + private final int maxMetaFetchCount; private final ProducerInterceptors interceptors; /** @@ -277,6 +279,13 @@ private KafkaProducer(ProducerConfig config, Serializer keySerializer, Serial this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG); } + if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_MAX_COUNT_CONFIG)) { + this.maxMetaFetchCount = config.getInt(ProducerConfig.METADATA_FETCH_MAX_COUNT_CONFIG); + } else { // if not specified, then this parameter is just not taking effect + this.maxMetaFetchCount = Integer.MAX_VALUE; + } + + /* check for user defined settings. * If the TIME_OUT config is set use that for request timeout. * This should be removed with release 0.9 @@ -530,12 +539,12 @@ private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long long begin = time.milliseconds(); long remainingWaitMs = maxWaitMs; - long elapsed; - // Issue metadata requests until we have metadata for the topic or maxWaitTimeMs is exceeded. - // In case we already have cached metadata for the topic, but the requested partition is greater - // than expected, issue an update request only once. This is necessary in case the metadata - // is stale and the number of partitions for this topic has increased in the meantime. - do { + long elapsed = 0; + + // update metadata controlled by both maxWaitMs and maxMetaFetchCount + // when auto.create.topics.enable=true and sending a msg to non-exist topic + // setting maxMetaFetchCount will reduce many unnecessary metadata request + for (int i = 0; i < maxMetaFetchCount; ++i) { log.trace("Requesting metadata update for topic {}.", topic); int version = metadata.requestUpdate(); sender.wakeup(); @@ -553,7 +562,16 @@ private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long throw new TopicAuthorizationException(topic); remainingWaitMs = maxWaitMs - elapsed; partitionsCount = cluster.partitionCountForTopic(topic); - } while (partitionsCount == null); + + // get meta data for this topic, no need to call again + if (null != partitionsCount) { + break; + } + } + + if (null == partitionsCount) { + throw new MetadataNotAvailableException("Metadata not available for topic=" + topic); + } if (partition != null && partition >= partitionsCount) { throw new KafkaException( diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 4b0e999e018c5..a53a9328c2409 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -55,6 +55,11 @@ public class ProducerConfig extends AbstractConfig { + "host the topic's partitions. This config specifies the maximum time, in milliseconds, for this fetch " + "to succeed before throwing an exception back to the client."; + public static final String METADATA_FETCH_MAX_COUNT_CONFIG = "metadata.fetch.max.count"; + private static final String METADATA_FETCH_MAX_COUNT_DOC = "when fetching metadata for a topic, within the time of metadata.fetch.timeout.ms, " + + " if the metadata response does not contain the topic's metadata, then it will keep sending the meta request until metadata.fetch.timeout.ms is exceeded" + + " this will become too many overhead when broker side has configured auto.create.topics.enable=false and a msg is sent to non-exist topic "; + /** metadata.max.age.ms */ public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG; private static final String METADATA_MAX_AGE_DOC = CommonClientConfigs.METADATA_MAX_AGE_DOC; @@ -251,6 +256,11 @@ public class ProducerConfig extends AbstractConfig { atLeast(0), Importance.LOW, METADATA_FETCH_TIMEOUT_DOC) + .define(METADATA_FETCH_MAX_COUNT_CONFIG, + Type.INT, + Integer.MAX_VALUE, + Importance.LOW, + METADATA_FETCH_MAX_COUNT_DOC) .define(MAX_BLOCK_MS_CONFIG, Type.LONG, 60 * 1000, diff --git a/clients/src/main/java/org/apache/kafka/common/errors/MetadataNotAvailableException.java b/clients/src/main/java/org/apache/kafka/common/errors/MetadataNotAvailableException.java new file mode 100644 index 0000000000000..2dd3de2600e29 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/MetadataNotAvailableException.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.errors; + +public class MetadataNotAvailableException extends RetriableException { + + public MetadataNotAvailableException(String message) { + super(message); + } + +} From 5fb0f75b401ad5f0bfa6d4e5bd7535c67ab944ac Mon Sep 17 00:00:00 2001 From: yaojuncn Date: Sat, 12 Nov 2016 20:42:05 -0800 Subject: [PATCH 2/2] KAFKA-4385: fix comment error --- .../java/org/apache/kafka/clients/producer/KafkaProducer.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index a1265b01f1641..f7221eba86b00 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -542,8 +542,8 @@ private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long long elapsed = 0; // update metadata controlled by both maxWaitMs and maxMetaFetchCount - // when auto.create.topics.enable=true and sending a msg to non-exist topic - // setting maxMetaFetchCount will reduce many unnecessary metadata request + // when auto.create.topics.enable=false and sending a msg to non-exist topic + // setting maxMetaFetchCount=1 can reduce many unnecessary metadata request for (int i = 0; i < maxMetaFetchCount; ++i) { log.trace("Requesting metadata update for topic {}.", topic); int version = metadata.requestUpdate();