From 4e74f78a2e6ab8053cdcdb3bb70401637c75514c Mon Sep 17 00:00:00 2001 From: Vahid Hashemian Date: Tue, 19 Apr 2016 11:21:45 -0700 Subject: [PATCH] KAFKA-3579 - Update reference to the outdated consumer property Add references to the new consumer property 'max.partition.fetch.bytes' along with the old consumer property 'fetch.message.max.bytes' in the corresponding warning messages of TopicCommand. Also, create and leverage a static variable for the default value of the new consumer property. Also, use 'DEFAULT_...' for default propoerty constant names in the code instead of '..._DEFAULT'. --- .../clients/consumer/ConsumerConfig.java | 7 ++-- .../internals/ConsumerCoordinatorTest.java | 14 ++++---- .../main/scala/kafka/admin/TopicCommand.scala | 35 +++++++++++-------- 3 files changed, 31 insertions(+), 25 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index 69c4a3620925..6523d184585c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -115,6 +115,7 @@ public class ConsumerConfig extends AbstractConfig { */ public static final String MAX_PARTITION_FETCH_BYTES_CONFIG = "max.partition.fetch.bytes"; private static final String MAX_PARTITION_FETCH_BYTES_DOC = "The maximum amount of data per-partition the server will return. The maximum total memory used for a request will be #partitions * max.partition.fetch.bytes. This size must be at least as large as the maximum message size the server allows or else it is possible for the producer to send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition."; + public static final int DEFAULT_MAX_PARTITION_FETCH_BYTES = 1 * 1024 * 1024; /** send.buffer.bytes */ public static final String SEND_BUFFER_CONFIG = CommonClientConfigs.SEND_BUFFER_CONFIG; @@ -184,7 +185,7 @@ public class ConsumerConfig extends AbstractConfig { public static final String EXCLUDE_INTERNAL_TOPICS_CONFIG = "exclude.internal.topics"; private static final String EXCLUDE_INTERNAL_TOPICS_DOC = "Whether records from internal topics (such as offsets) should be exposed to the consumer. " + "If set to true the only way to receive records from an internal topic is subscribing to it."; - public static final boolean EXCLUDE_INTERNAL_TOPICS_DEFAULT = true; + public static final boolean DEFAULT_EXCLUDE_INTERNAL_TOPICS = true; static { CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, @@ -231,7 +232,7 @@ public class ConsumerConfig extends AbstractConfig { CommonClientConfigs.CLIENT_ID_DOC) .define(MAX_PARTITION_FETCH_BYTES_CONFIG, Type.INT, - 1 * 1024 * 1024, + DEFAULT_MAX_PARTITION_FETCH_BYTES, atLeast(0), Importance.HIGH, MAX_PARTITION_FETCH_BYTES_DOC) @@ -332,7 +333,7 @@ public class ConsumerConfig extends AbstractConfig { MAX_POLL_RECORDS_DOC) .define(EXCLUDE_INTERNAL_TOPICS_CONFIG, Type.BOOLEAN, - EXCLUDE_INTERNAL_TOPICS_DEFAULT, + DEFAULT_EXCLUDE_INTERNAL_TOPICS, Importance.MEDIUM, EXCLUDE_INTERNAL_TOPICS_DOC) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 5a174db16241..4fce8fa08243 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -112,7 +112,7 @@ public void setup() { this.partitionAssignor.clear(); client.setNode(node); - this.coordinator = buildCoordinator(metrics, assignors, ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, autoCommitEnabled); + this.coordinator = buildCoordinator(metrics, assignors, ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, autoCommitEnabled); } @After @@ -733,7 +733,7 @@ public void testAutoCommitDynamicAssignment() { final String consumerId = "consumer"; ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors, - ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, true); + ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true); subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener); subscriptions.needReassignment(); @@ -759,7 +759,7 @@ public void testAutoCommitDynamicAssignmentRebalance() { final String consumerId = "consumer"; ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors, - ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, true); + ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true); subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener); subscriptions.needReassignment(); @@ -787,7 +787,7 @@ public void testAutoCommitDynamicAssignmentRebalance() { @Test public void testAutoCommitManualAssignment() { ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors, - ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, true); + ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true); subscriptions.assignFromUser(Arrays.asList(tp)); subscriptions.seek(tp, 100); @@ -805,7 +805,7 @@ public void testAutoCommitManualAssignment() { @Test public void testAutoCommitManualAssignmentCoordinatorUnknown() { ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors, - ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, true); + ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true); subscriptions.assignFromUser(Arrays.asList(tp)); subscriptions.seek(tp, 100); @@ -1094,7 +1094,7 @@ public void testProtocolMetadataOrder() { try (Metrics metrics = new Metrics(time)) { ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.asList(roundRobin, range), - ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, false); + ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, false); List metadata = coordinator.metadata(); assertEquals(2, metadata.size()); assertEquals(roundRobin.name(), metadata.get(0).name()); @@ -1103,7 +1103,7 @@ public void testProtocolMetadataOrder() { try (Metrics metrics = new Metrics(time)) { ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.asList(range, roundRobin), - ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, false); + ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, false); List metadata = coordinator.metadata(); assertEquals(2, metadata.size()); assertEquals(range.name(), metadata.get(0).name()); diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 029adea1b58c..e6ebb96c872b 100755 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -20,7 +20,7 @@ package kafka.admin import java.util.Properties import joptsimple._ import kafka.common.{AdminCommandFailedException, Topic, TopicExistsException} -import kafka.consumer.{ConsumerConfig, Whitelist} +import kafka.consumer.{ConsumerConfig => OldConsumerConfig, Whitelist} import kafka.coordinator.GroupCoordinator import kafka.log.{Defaults, LogConfig} import kafka.server.ConfigType @@ -31,6 +31,7 @@ import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.utils.Utils import scala.collection.JavaConversions._ import scala.collection._ +import org.apache.kafka.clients.consumer.{ConsumerConfig => NewConsumerConfig} import org.apache.kafka.common.internals.TopicConstants @@ -383,30 +384,34 @@ object TopicCommand extends Logging { def shortMessageSizeWarning(maxMessageBytes: Int): String = { "\n\n" + "*****************************************************************************************************\n" + - "*** WARNING: you are creating a topic where the max.message.bytes is greater than the consumer ***\n" + - "*** default. This operation is potentially dangerous. Consumers will get failures if their ***\n" + - "*** fetch.message.max.bytes < the value you are using. ***\n" + + "*** WARNING: you are creating a topic where the max.message.bytes is greater than the broker's ***\n" + + "*** default max.message.bytes. This operation is potentially dangerous. Consumers will get ***\n" + + s"*** failures if their fetch.message.max.bytes (old consumer) or ${NewConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG} ***\n"+ + "*** (new consumer) < the value you are using. ***\n" + "*****************************************************************************************************\n" + s"- value set here: $maxMessageBytes\n" + - s"- Default Consumer fetch.message.max.bytes: ${ConsumerConfig.FetchSize}\n" + + s"- Default Old Consumer fetch.message.max.bytes: ${OldConsumerConfig.FetchSize}\n" + + s"- Default New Consumer ${NewConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG}: ${NewConsumerConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES}\n" + s"- Default Broker max.message.bytes: ${kafka.server.Defaults.MessageMaxBytes}\n\n" } def longMessageSizeWarning(maxMessageBytes: Int): String = { "\n\n" + - "****************************************************************************************************\n" + - "*** WARNING: you are creating a topic where the max.message.bytes is greater than the broker ***\n" + - "*** default. This operation is dangerous. There are two potential side effects: ***\n" + - "*** - Consumers will get failures if their fetch.message.max.bytes < the value you are using ***\n" + - "*** - Producer requests larger than replica.fetch.max.bytes will not replicate and hence have ***\n" + - "*** a higher risk of data loss ***\n" + - "*** You should ensure both of these settings are greater than the value set here before using ***\n" + - "*** this topic. ***\n" + - "****************************************************************************************************\n" + + "*****************************************************************************************************\n" + + "*** WARNING: you are creating a topic where the max.message.bytes is greater than the broker's ***\n" + + "*** default max.message.bytes. This operation is dangerous. There are two potential side effects: ***\n" + + "*** - Consumers will get failures if their fetch.message.max.bytes (old consumer) or ***\n" + + s"*** ${NewConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG} (new consumer) < the value you are using ***\n" + + "*** - Producer requests larger than replica.fetch.max.bytes will not replicate and hence have ***\n" + + "*** a higher risk of data loss ***\n" + + "*** You should ensure both of these settings are greater than the value set here before using ***\n" + + "*** this topic. ***\n" + + "*****************************************************************************************************\n" + s"- value set here: $maxMessageBytes\n" + s"- Default Broker replica.fetch.max.bytes: ${kafka.server.Defaults.ReplicaFetchMaxBytes}\n" + s"- Default Broker max.message.bytes: ${kafka.server.Defaults.MessageMaxBytes}\n" + - s"- Default Consumer fetch.message.max.bytes: ${ConsumerConfig.FetchSize}\n\n" + s"- Default Old Consumer fetch.message.max.bytes: ${OldConsumerConfig.FetchSize}\n" + + s"- Default New Consumer ${NewConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG}: ${NewConsumerConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES}\n\n" } }