From 330ed5c766f8cc29bbd08c6fcf8d8878e0e9dfcc Mon Sep 17 00:00:00 2001 From: Cheryl Simmons Date: Mon, 1 May 2023 12:45:58 -0700 Subject: [PATCH 1/9] fixing format issues --- .../main/scala/kafka/server/KafkaConfig.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index ccbf5fb314ba..16060d2aa32b 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -749,7 +749,7 @@ object KafkaConfig { s"are not appended to the metadata partition. The default value is ${Defaults.MetadataMaxIdleIntervalMs}"; val ControllerListenerNamesDoc = "A comma-separated list of the names of the listeners used by the controller. This is required " + "if running in KRaft mode. When communicating with the controller quorum, the broker will always use the first listener in this list.\n " + - "Note: The ZK-based controller should not set this configuration." + "Note: The ZooKeeper based controller should not set this configuration." val SaslMechanismControllerProtocolDoc = "SASL mechanism used for communication with controllers. Default is GSSAPI." val MetadataLogSegmentBytesDoc = "The maximum size of a single metadata log file." val MetadataLogSegmentMinBytesDoc = "Override the minimum size for a single metadata log file. This should be used for testing only." @@ -802,18 +802,18 @@ object KafkaConfig { "is assumed if no explicit mapping is provided and no other security protocol is in use." val controlPlaneListenerNameDoc = "Name of listener used for communication between controller and brokers. " + s"Broker will use the $ControlPlaneListenerNameProp to locate the endpoint in $ListenersProp list, to listen for connections from the controller. " + - "For example, if a broker's config is :\n" + - "listeners = INTERNAL://192.1.1.8:9092, EXTERNAL://10.1.1.5:9093, CONTROLLER://192.1.1.8:9094\n" + - "listener.security.protocol.map = INTERNAL:PLAINTEXT, EXTERNAL:SSL, CONTROLLER:SSL\n" + - "control.plane.listener.name = CONTROLLER\n" + + "For example, if a broker's config is:\n" + + "listeners = INTERNAL://192.1.1.8:9092, EXTERNAL://10.1.1.5:9093, CONTROLLER://192.1.1.8:9094" + + "listener.security.protocol.map = INTERNAL:PLAINTEXT, EXTERNAL:SSL, CONTROLLER:SSL" + + "control.plane.listener.name = CONTROLLER\n" + "On startup, the broker will start listening on \"192.1.1.8:9094\" with security protocol \"SSL\".\n" + s"On controller side, when it discovers a broker's published endpoints through zookeeper, it will use the $ControlPlaneListenerNameProp " + "to find the endpoint, which it will use to establish connection to the broker.\n" + - "For example, if the broker's published endpoints on zookeeper are :\n" + + "For example, if the broker's published endpoints on ZooKeeper are :\n" + "\"endpoints\" : [\"INTERNAL://broker1.example.com:9092\",\"EXTERNAL://broker1.example.com:9093\",\"CONTROLLER://broker1.example.com:9094\"]\n" + " and the controller's config is :\n" + - "listener.security.protocol.map = INTERNAL:PLAINTEXT, EXTERNAL:SSL, CONTROLLER:SSL\n" + - "control.plane.listener.name = CONTROLLER\n" + + "listener.security.protocol.map = INTERNAL:PLAINTEXT, EXTERNAL:SSL, CONTROLLER:SSL\n" + + "control.plane.listener.name = CONTROLLER\n" + "then controller will use \"broker1.example.com:9094\" with security protocol \"SSL\" to connect to the broker.\n" + "If not explicitly configured, the default value will be null and there will be no dedicated endpoints for controller connections.\n" + s"If explicitly configured, the value cannot be the same as the value of $InterBrokerListenerNameProp." From 3271ea563ccc734471a399031242abf6aaf272e6 Mon Sep 17 00:00:00 2001 From: Cheryl Simmons Date: Mon, 1 May 2023 16:36:45 -0700 Subject: [PATCH 2/9] more format fixes --- .../main/scala/kafka/server/KafkaConfig.scala | 60 +++++++++---------- 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 16060d2aa32b..dabcd9c54b77 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -667,9 +667,9 @@ object KafkaConfig { "The server can also have a ZooKeeper chroot path as part of its ZooKeeper connection string which puts its data under some path in the global ZooKeeper namespace. " + "For example to give a chroot path of /chroot/path you would give the connection string as hostname1:port1,hostname2:port2,hostname3:port3/chroot/path." val ZkSessionTimeoutMsDoc = "Zookeeper session timeout" - val ZkConnectionTimeoutMsDoc = "The max time that the client waits to establish a connection to zookeeper. If not set, the value in " + ZkSessionTimeoutMsProp + " is used" + val ZkConnectionTimeoutMsDoc = "The max time that the client waits to establish a connection to ZooKeeper. If not set, the value in " + ZkSessionTimeoutMsProp + " is used" val ZkEnableSecureAclsDoc = "Set client to use secure ACLs" - val ZkMaxInFlightRequestsDoc = "The maximum number of unacknowledged requests the client will send to Zookeeper before blocking." + val ZkMaxInFlightRequestsDoc = "The maximum number of unacknowledged requests the client will send to ZooKeeper before blocking." val ZkSslClientEnableDoc = "Set client to use TLS when connecting to ZooKeeper." + " An explicit value overrides any value set via the zookeeper.client.secure system property (note the different name)." + s" Defaults to false if neither is set; when true, $ZkClientCnxnSocketProp must be set (typically to org.apache.zookeeper.ClientCnxnSocketNetty); other values to set may include " + @@ -709,7 +709,7 @@ object KafkaConfig { val BrokerIdGenerationEnableDoc = s"Enable automatic broker id generation on the server. When enabled the value configured for $MaxReservedBrokerIdProp should be reviewed." val MaxReservedBrokerIdDoc = "Max number that can be used for a broker.id" val BrokerIdDoc = "The broker id for this server. If unset, a unique broker id will be generated." + - "To avoid conflicts between zookeeper generated broker id's and user configured broker id's, generated broker ids " + + "To avoid conflicts between ZooKeeper generated broker id's and user configured broker id's, generated broker ids " + "start from " + MaxReservedBrokerIdProp + " + 1." val MessageMaxBytesDoc = TopicConfig.MAX_MESSAGE_BYTES_DOC + s"This can be set per topic with the topic level ${TopicConfig.MAX_MESSAGE_BYTES_CONFIG} config." @@ -725,7 +725,7 @@ object KafkaConfig { /** KRaft mode configs */ val ProcessRolesDoc = "The roles that this process plays: 'broker', 'controller', or 'broker,controller' if it is both. " + - "This configuration is only applicable for clusters in KRaft (Kafka Raft) mode (instead of ZooKeeper). Leave this config undefined or empty for Zookeeper clusters." + "This configuration is only applicable for clusters in KRaft (Kafka Raft) mode (instead of ZooKeeper). Leave this config undefined or empty for ZooKeeper clusters." val InitialBrokerRegistrationTimeoutMsDoc = "When initially registering with the controller quorum, the number of milliseconds to wait before declaring failure and exiting the broker process." val BrokerHeartbeatIntervalMsDoc = "The length of time in milliseconds between broker heartbeats. Used when running in KRaft mode." val BrokerSessionTimeoutMsDoc = "The length of time in milliseconds that a broker lease lasts if no heartbeats are made. Used when running in KRaft mode." @@ -779,9 +779,9 @@ object KafkaConfig { " Specify hostname as 0.0.0.0 to bind to all interfaces.\n" + " Leave hostname empty to bind to default interface.\n" + " Examples of legal listener lists:\n" + - " PLAINTEXT://myhost:9092,SSL://:9091\n" + - " CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093\n" + - " PLAINTEXT://127.0.0.1:9092,SSL://[::1]:9092\n" + " PLAINTEXT://myhost:9092,SSL://:9091\n" + + " CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093\n" + + " PLAINTEXT://127.0.0.1:9092,SSL://[::1]:9092\n" val AdvertisedListenersDoc = s"Listeners to publish to ZooKeeper for clients to use, if different than the $ListenersProp config property." + " In IaaS environments, this may need to be different from the interface to which the broker binds." + s" If this is not set, the value for $ListenersProp will be used." + @@ -882,12 +882,12 @@ object KafkaConfig { val LogCleanerMinCompactionLagMsDoc = "The minimum time a message will remain uncompacted in the log. Only applicable for logs that are being compacted." val LogCleanerMaxCompactionLagMsDoc = "The maximum time a message will remain ineligible for compaction in the log. Only applicable for logs that are being compacted." val LogIndexSizeMaxBytesDoc = "The maximum size in bytes of the offset index" - val LogIndexIntervalBytesDoc = "The interval with which we add an entry to the offset index" - val LogFlushIntervalMessagesDoc = "The number of messages accumulated on a log partition before messages are flushed to disk " + val LogIndexIntervalBytesDoc = "The interval with which we add an entry to the offset index." + val LogFlushIntervalMessagesDoc = "The number of messages accumulated on a log partition before messages are flushed to disk." val LogDeleteDelayMsDoc = "The amount of time to wait before deleting a file from the filesystem" val LogFlushSchedulerIntervalMsDoc = "The frequency in ms that the log flusher checks whether any log needs to be flushed to disk" val LogFlushIntervalMsDoc = "The maximum time in ms that a message in any topic is kept in memory before flushed to disk. If not set, the value in " + LogFlushSchedulerIntervalMsProp + " is used" - val LogFlushOffsetCheckpointIntervalMsDoc = "The frequency with which we update the persistent record of the last flush which acts as the log recovery point" + val LogFlushOffsetCheckpointIntervalMsDoc = "The frequency with which we update the persistent record of the last flush which acts as the log recovery point." val LogFlushStartOffsetCheckpointIntervalMsDoc = "The frequency with which we update the persistent record of log start offset" val LogPreAllocateEnableDoc = "Should pre allocate file when create new segment? If you are using Kafka on Windows, you probably need to set it to true." val LogMessageFormatVersionDoc = "Specify the message format version the broker will use to append messages to the logs. The value should be a valid MetadataVersion. " + @@ -896,21 +896,21 @@ object KafkaConfig { "will cause consumers with older versions to break as they will receive messages with a format that they don't understand." val LogMessageTimestampTypeDoc = "Define whether the timestamp in the message is message create time or log append time. The value should be either " + - "`CreateTime` or `LogAppendTime`" + "`CreateTime` or `LogAppendTime." val LogMessageTimestampDifferenceMaxMsDoc = "The maximum difference allowed between the timestamp when a broker receives " + "a message and the timestamp specified in the message. If log.message.timestamp.type=CreateTime, a message will be rejected " + "if the difference in timestamp exceeds this threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime." + "The maximum timestamp difference allowed should be no greater than log.retention.ms to avoid unnecessarily frequent log rolling." val NumRecoveryThreadsPerDataDirDoc = "The number of threads per data directory to be used for log recovery at startup and flushing at shutdown" - val AutoCreateTopicsEnableDoc = "Enable auto creation of topic on the server" + val AutoCreateTopicsEnableDoc = "Enable auto creation of topic on the server." val MinInSyncReplicasDoc = "When a producer sets acks to \"all\" (or \"-1\"), " + - "min.insync.replicas specifies the minimum number of replicas that must acknowledge " + + "$MinInsyncReplicasProp specifies the minimum number of replicas that must acknowledge " + "a write for the write to be considered successful. If this minimum cannot be met, " + - "then the producer will raise an exception (either NotEnoughReplicas or " + - "NotEnoughReplicasAfterAppend).
When used together, min.insync.replicas and acks " + + "then the producer will raise an exception (either NotEnoughReplicas or " + + "NotEnoughReplicasAfterAppend).
When used together, $MinInsyncReplicasProp and acks " + "allow you to enforce greater durability guarantees. A typical scenario would be to " + - "create a topic with a replication factor of 3, set min.insync.replicas to 2, and " + + "create a topic with a replication factor of 3, set $MinInsyncReplicasProp to 2, and " + "produce with acks of \"all\". This will ensure that the producer raises an exception " + "if a majority of replicas do not receive a write." @@ -921,8 +921,8 @@ object KafkaConfig { val LogMessageDownConversionEnableDoc = TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_DOC; /** ********* Replication configuration ***********/ - val ControllerSocketTimeoutMsDoc = "The socket timeout for controller-to-broker channels" - val DefaultReplicationFactorDoc = "The default replication factors for automatically created topics" + val ControllerSocketTimeoutMsDoc = "The socket timeout for controller-to-broker channels." + val DefaultReplicationFactorDoc = "The default replication factors for automatically created topics." val ReplicaLagTimeMaxMsDoc = "If a follower hasn't sent any fetch requests or hasn't consumed up to the leaders log end offset for at least this time," + " the leader will remove the follower from isr" val ReplicaSocketTimeoutMsDoc = "The socket timeout for network requests. Its value should be at least replica.fetch.wait.max.ms" @@ -948,7 +948,7 @@ object KafkaConfig { val FetchPurgatoryPurgeIntervalRequestsDoc = "The purge interval (in number of requests) of the fetch request purgatory" val ProducerPurgatoryPurgeIntervalRequestsDoc = "The purge interval (in number of requests) of the producer request purgatory" val DeleteRecordsPurgatoryPurgeIntervalRequestsDoc = "The purge interval (in number of requests) of the delete records request purgatory" - val AutoLeaderRebalanceEnableDoc = "Enables auto leader balancing. A background thread checks the distribution of partition leaders at regular intervals, configurable by `leader.imbalance.check.interval.seconds`. If the leader imbalance exceeds `leader.imbalance.per.broker.percentage`, leader rebalance to the preferred leader for partitions is triggered." + val AutoLeaderRebalanceEnableDoc = s"Enables auto leader balancing. A background thread checks the distribution of partition leaders at regular intervals, configurable by $LeaderImbalanceCheckIntervalSecondsProp. If the leader imbalance exceeds $LeaderImbalancePerBrokerPercentageProp, leader rebalance to the preferred leader for partitions is triggered." val LeaderImbalancePerBrokerPercentageDoc = "The ratio of leader imbalance allowed per broker. The controller would trigger a leader balance if it goes above this value per broker. The value is specified in percentage." val LeaderImbalanceCheckIntervalSecondsDoc = "The frequency with which the partition rebalance check is triggered by the controller" val UncleanLeaderElectionEnableDoc = "Indicates whether to enable replicas not in the ISR set to be elected as leader as a last resort, even though doing so may result in data loss" @@ -964,7 +964,7 @@ object KafkaConfig { /** ********* Controlled shutdown configuration ***********/ val ControlledShutdownMaxRetriesDoc = "Controlled shutdown can fail for multiple reasons. This determines the number of retries when such failure happens" val ControlledShutdownRetryBackoffMsDoc = "Before each retry, the system needs time to recover from the state that caused the previous failure (Controller fail over, replica lag etc). This config determines the amount of time to wait before retrying." - val ControlledShutdownEnableDoc = "Enable controlled shutdown of the server" + val ControlledShutdownEnableDoc = "Enable controlled shutdown of the server." /** ********* Group coordinator configuration ***********/ val GroupMinSessionTimeoutMsDoc = "The minimum allowed session timeout for registered consumers. Shorter timeouts result in quicker failure detection at the cost of more frequent consumer heartbeating, which can overwhelm broker resources." @@ -987,13 +987,13 @@ object KafkaConfig { val ConsumerGroupAssignorsDoc = "The server side assignors as a list of full class names. The first one in the list is considered as the default assignor to be used in the case where the consumer does not specify an assignor." /** ********* Offset management configuration ***********/ - val OffsetMetadataMaxSizeDoc = "The maximum size for a metadata entry associated with an offset commit" + val OffsetMetadataMaxSizeDoc = "The maximum size for a metadata entry associated with an offset commit." val OffsetsLoadBufferSizeDoc = "Batch size for reading from the offsets segments when loading offsets into the cache (soft-limit, overridden if records are too large)." val OffsetsTopicReplicationFactorDoc = "The replication factor for the offsets topic (set higher to ensure availability). " + "Internal topic creation will fail until the cluster size meets this replication factor requirement." - val OffsetsTopicPartitionsDoc = "The number of partitions for the offset commit topic (should not change after deployment)" - val OffsetsTopicSegmentBytesDoc = "The offsets topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads" - val OffsetsTopicCompressionCodecDoc = "Compression codec for the offsets topic - compression may be used to achieve \"atomic\" commits" + val OffsetsTopicPartitionsDoc = "The number of partitions for the offset commit topic (should not change after deployment)." + val OffsetsTopicSegmentBytesDoc = "The offsets topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads." + val OffsetsTopicCompressionCodecDoc = "Compression codec for the offsets topic - compression may be used to achieve \"atomic\" commits." val OffsetsRetentionMinutesDoc = "For subscribed consumers, committed offset of a specific partition will be expired and discarded when 1) this retention period has elapsed after the consumer group loses all its consumers (i.e. becomes empty); " + "2) this retention period has elapsed since the last time an offset is committed for the partition and the group is no longer subscribed to the corresponding topic. " + "For standalone consumers (using manual assignment), offsets will be expired after this retention period has elapsed since the time of last commit. " + @@ -1002,7 +1002,7 @@ object KafkaConfig { val OffsetsRetentionCheckIntervalMsDoc = "Frequency at which to check for stale offsets" val OffsetCommitTimeoutMsDoc = "Offset commit will be delayed until all replicas for the offsets topic receive the commit " + "or this timeout is reached. This is similar to the producer request timeout." - val OffsetCommitRequiredAcksDoc = "The required acks before the commit can be accepted. In general, the default (-1) should not be overridden" + val OffsetCommitRequiredAcksDoc = "The required acks before the commit can be accepted. In general, the default (-1) should not be overridden." /** ********* Transaction management configuration ***********/ val TransactionalIdExpirationMsDoc = "The time in ms that the transaction coordinator will wait without receiving any transaction status updates " + "for the current transaction before expiring its transactional id. Transactional IDs will not expire while a the transaction is still ongoing." @@ -1013,16 +1013,16 @@ object KafkaConfig { val TransactionsTopicReplicationFactorDoc = "The replication factor for the transaction topic (set higher to ensure availability). " + "Internal topic creation will fail until the cluster size meets this replication factor requirement." val TransactionsTopicPartitionsDoc = "The number of partitions for the transaction topic (should not change after deployment)." - val TransactionsTopicSegmentBytesDoc = "The transaction topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads" - val TransactionsAbortTimedOutTransactionsIntervalMsDoc = "The interval at which to rollback transactions that have timed out" - val TransactionsRemoveExpiredTransactionsIntervalMsDoc = "The interval at which to remove transactions that have expired due to transactional.id.expiration.ms passing" + val TransactionsTopicSegmentBytesDoc = "The transaction topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads." + val TransactionsAbortTimedOutTransactionsIntervalMsDoc = "The interval at which to rollback transactions that have timed out." + val TransactionsRemoveExpiredTransactionsIntervalMsDoc = "The interval at which to remove transactions that have expired due to transactional.id.expiration.ms passing." - val TransactionPartitionVerificationEnableDoc = "Enable verification that checks that the partition has been added to the transaction before writing transactional records to the partition" + val TransactionPartitionVerificationEnableDoc = "Enable verification that checks that the partition has been added to the transaction before writing transactional records to the partition." val ProducerIdExpirationMsDoc = "The time in ms that a topic partition leader will wait before expiring producer IDs. Producer IDs will not expire while a transaction associated to them is still ongoing. " + "Note that producer IDs may expire sooner if the last write from the producer ID is deleted due to the topic's retention settings. Setting this value the same or higher than " + "delivery.timeout.ms can help prevent expiration during retries and protect against message duplication, but the default should be reasonable for most use cases." - val ProducerIdExpirationCheckIntervalMsDoc = "The interval at which to remove producer IDs that have expired due to producer.id.expiration.ms passing" + val ProducerIdExpirationCheckIntervalMsDoc = "The interval at which to remove producer IDs that have expired due to producer.id.expiration.ms passing." /** ********* Fetch Configuration **************/ val MaxIncrementalFetchSessionCacheSlotsDoc = "The maximum number of incremental fetch sessions that we will maintain." From 964570f5f808abbdb25d12aa2bafe7ebf8827fd0 Mon Sep 17 00:00:00 2001 From: Cheryl Simmons Date: Mon, 1 May 2023 17:04:28 -0700 Subject: [PATCH 3/9] fixing variable reference --- core/src/main/scala/kafka/server/KafkaConfig.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index dabcd9c54b77..75a38ca88f26 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -905,12 +905,12 @@ object KafkaConfig { val NumRecoveryThreadsPerDataDirDoc = "The number of threads per data directory to be used for log recovery at startup and flushing at shutdown" val AutoCreateTopicsEnableDoc = "Enable auto creation of topic on the server." val MinInSyncReplicasDoc = "When a producer sets acks to \"all\" (or \"-1\"), " + - "$MinInsyncReplicasProp specifies the minimum number of replicas that must acknowledge " + + s"$MinInsyncReplicasProp specifies the minimum number of replicas that must acknowledge " + "a write for the write to be considered successful. If this minimum cannot be met, " + "then the producer will raise an exception (either NotEnoughReplicas or " + - "NotEnoughReplicasAfterAppend).
When used together, $MinInsyncReplicasProp and acks " + + s"NotEnoughReplicasAfterAppend).
When used together, $MinInsyncReplicasProp and acks " + "allow you to enforce greater durability guarantees. A typical scenario would be to " + - "create a topic with a replication factor of 3, set $MinInsyncReplicasProp to 2, and " + + s"create a topic with a replication factor of 3, set $MinInsyncReplicasProp to 2, and " + "produce with acks of \"all\". This will ensure that the producer raises an exception " + "if a majority of replicas do not receive a write." From afd2e9c7087932f36d1c6f0bf7d4eb0d0c4ef98d Mon Sep 17 00:00:00 2001 From: Cheryl Simmons Date: Tue, 2 May 2023 08:48:35 -0700 Subject: [PATCH 4/9] update to formatting --- core/src/main/scala/kafka/server/KafkaConfig.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 75a38ca88f26..9e23ac4fb24a 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -905,12 +905,12 @@ object KafkaConfig { val NumRecoveryThreadsPerDataDirDoc = "The number of threads per data directory to be used for log recovery at startup and flushing at shutdown" val AutoCreateTopicsEnableDoc = "Enable auto creation of topic on the server." val MinInSyncReplicasDoc = "When a producer sets acks to \"all\" (or \"-1\"), " + - s"$MinInsyncReplicasProp specifies the minimum number of replicas that must acknowledge " + + "min.insync.replicas specifies the minimum number of replicas that must acknowledge " + "a write for the write to be considered successful. If this minimum cannot be met, " + "then the producer will raise an exception (either NotEnoughReplicas or " + - s"NotEnoughReplicasAfterAppend).
When used together, $MinInsyncReplicasProp and acks " + + "NotEnoughReplicasAfterAppend).
When used together, min.insync.replicas and acks " + "allow you to enforce greater durability guarantees. A typical scenario would be to " + - s"create a topic with a replication factor of 3, set $MinInsyncReplicasProp to 2, and " + + "create a topic with a replication factor of 3, set min.insync.replicas to 2, and " + "produce with acks of \"all\". This will ensure that the producer raises an exception " + "if a majority of replicas do not receive a write." From 03614bde12fdd464a195c6188d762b748d9d5a45 Mon Sep 17 00:00:00 2001 From: Cheryl Simmons Date: Tue, 2 May 2023 13:02:29 -0700 Subject: [PATCH 5/9] multiple fixes --- .../clients/producer/ProducerConfig.java | 30 +++++++++---------- .../main/scala/kafka/server/KafkaConfig.scala | 22 +++++++------- .../org/apache/kafka/raft/RaftConfig.java | 6 ++-- 3 files changed, 28 insertions(+), 30 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 6b65e4587ca3..ed68776fe820 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -281,22 +281,20 @@ public class ProducerConfig extends AbstractConfig { /** partitioner.class */ public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class"; - private static final String PARTITIONER_CLASS_DOC = "A class to use to determine which partition to be send to when produce the records. Available options are:" + - "
    " + - "
  • If not set, the default partitioning logic is used. " + - "This strategy will try sticking to a partition until at least " + BATCH_SIZE_CONFIG + " bytes is produced to the partition. It works with the strategy:" + - "
      " + - "
    • If no partition is specified but a key is present, choose a partition based on a hash of the key
    • " + - "
    • If no partition or key is present, choose the sticky partition that changes when at least " + BATCH_SIZE_CONFIG + " bytes are produced to the partition.
    • " + - "
    " + - "
  • " + - "
  • org.apache.kafka.clients.producer.RoundRobinPartitioner: This partitioning strategy is that " + - "each record in a series of consecutive records will be sent to a different partition(no matter if the 'key' is provided or not), " + - "until we run out of partitions and start over again. Note: There's a known issue that will cause uneven distribution when new batch is created. " + - "Please check KAFKA-9965 for more detail." + - "
  • " + - "
" + - "

Implementing the org.apache.kafka.clients.producer.Partitioner interface allows you to plug in a custom partitioner."; + private static final String PARTITIONER_CLASS_DOC = "Determines which partition to send a record to when records are produced. Available options are:" + + "

    " + + "
  • If not set, the default partitioning logic is used. " + + "This strategy send records to a partition until at least " + BATCH_SIZE_CONFIG + " bytes is produced to the partition. It works with the strategy: /n" + + "1) If no partition is specified but a key is present, choose a partition based on a hash of the key. /n" + + "2) If no partition or key is present, choose the sticky partition that changes when at least " + BATCH_SIZE_CONFIG + " bytes are produced to the partition./n" + + "
  • " + + "
  • org.apache.kafka.clients.producer.RoundRobinPartitioner: A partitioning strategy where " + + "each record in a series of consecutive records is sent to a different partition, regardless of whether the 'key' is provided or not, " + + "until partitions run out and the process starts over again. Note: There's a known issue that will cause uneven distribution when a new batch is created. " + + "See KAFKA-9965 for more detail." + + "
  • " + + "
" + + "

Implementing the org.apache.kafka.clients.producer.Partitioner interface allows you to plug in a custom partitioner."; /** interceptor.classes */ public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes"; diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 9e23ac4fb24a..8e951715dcb1 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -749,7 +749,7 @@ object KafkaConfig { s"are not appended to the metadata partition. The default value is ${Defaults.MetadataMaxIdleIntervalMs}"; val ControllerListenerNamesDoc = "A comma-separated list of the names of the listeners used by the controller. This is required " + "if running in KRaft mode. When communicating with the controller quorum, the broker will always use the first listener in this list.\n " + - "Note: The ZooKeeper based controller should not set this configuration." + "Note: The ZooKeeper-based controller should not set this configuration." val SaslMechanismControllerProtocolDoc = "SASL mechanism used for communication with controllers. Default is GSSAPI." val MetadataLogSegmentBytesDoc = "The maximum size of a single metadata log file." val MetadataLogSegmentMinBytesDoc = "Override the minimum size for a single metadata log file. This should be used for testing only." @@ -807,16 +807,16 @@ object KafkaConfig { "listener.security.protocol.map = INTERNAL:PLAINTEXT, EXTERNAL:SSL, CONTROLLER:SSL" + "control.plane.listener.name = CONTROLLER\n" + "On startup, the broker will start listening on \"192.1.1.8:9094\" with security protocol \"SSL\".\n" + - s"On controller side, when it discovers a broker's published endpoints through zookeeper, it will use the $ControlPlaneListenerNameProp " + + s"On the controller side, when it discovers a broker's published endpoints through ZooKeeper, it will use the $ControlPlaneListenerNameProp " + "to find the endpoint, which it will use to establish connection to the broker.\n" + - "For example, if the broker's published endpoints on ZooKeeper are :\n" + - "\"endpoints\" : [\"INTERNAL://broker1.example.com:9092\",\"EXTERNAL://broker1.example.com:9093\",\"CONTROLLER://broker1.example.com:9094\"]\n" + - " and the controller's config is :\n" + - "listener.security.protocol.map = INTERNAL:PLAINTEXT, EXTERNAL:SSL, CONTROLLER:SSL\n" + + "For example, if the broker's published endpoints on ZooKeeper are:\n" + + " \"endpoints\" : [\"INTERNAL://broker1.example.com:9092\",\"EXTERNAL://broker1.example.com:9093\",\"CONTROLLER://broker1.example.com:9094\"]\n" + + " and the controller's config is:\n" + + "listener.security.protocol.map = INTERNAL:PLAINTEXT, EXTERNAL:SSL, CONTROLLER:SSL" + "control.plane.listener.name = CONTROLLER\n" + - "then controller will use \"broker1.example.com:9094\" with security protocol \"SSL\" to connect to the broker.\n" + + "then the controller will use \"broker1.example.com:9094\" with security protocol \"SSL\" to connect to the broker.\n" + "If not explicitly configured, the default value will be null and there will be no dedicated endpoints for controller connections.\n" + - s"If explicitly configured, the value cannot be the same as the value of $InterBrokerListenerNameProp." + s"If explicitly configured, the value cannot be the same as the value of $InterBrokerListenerNameProp." val SocketSendBufferBytesDoc = "The SO_SNDBUF buffer of the socket server sockets. If the value is -1, the OS default will be used." val SocketReceiveBufferBytesDoc = "The SO_RCVBUF buffer of the socket server sockets. If the value is -1, the OS default will be used." @@ -876,9 +876,9 @@ object KafkaConfig { "records for at least the " + LogCleanerMinCompactionLagMsProp + " duration, or (ii) if the log has had " + "dirty (uncompacted) records for at most the " + LogCleanerMaxCompactionLagMsProp + " period." val LogCleanerEnableDoc = "Enable the log cleaner process to run on the server. Should be enabled if using any topics with a cleanup.policy=compact including the internal offsets topic. If disabled those topics will not be compacted and continually grow in size." - val LogCleanerDeleteRetentionMsDoc = "The amount of time to retain delete tombstone markers for log compacted topics. This setting also gives a bound " + - "on the time in which a consumer must complete a read if they begin from offset 0 to ensure that they get a valid snapshot of the final stage (otherwise delete " + - "tombstones may be collected before they complete their scan)."; + val LogCleanerDeleteRetentionMsDoc = "The amount of time to retain tombstone message markers for log compacted topics. This setting also gives a bound " + + "on the time in which a consumer must complete a read if they begin from offset 0 to ensure that they get a valid snapshot of the final stage (otherwise " + + "tombstones messages may be collected before a consumer completes their scan)."; val LogCleanerMinCompactionLagMsDoc = "The minimum time a message will remain uncompacted in the log. Only applicable for logs that are being compacted." val LogCleanerMaxCompactionLagMsDoc = "The maximum time a message will remain ineligible for compaction in the log. Only applicable for logs that are being compacted." val LogIndexSizeMaxBytesDoc = "The maximum size in bytes of the offset index" diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftConfig.java b/raft/src/main/java/org/apache/kafka/raft/RaftConfig.java index 3ce72a591fe1..e17de3278d31 100644 --- a/raft/src/main/java/org/apache/kafka/raft/RaftConfig.java +++ b/raft/src/main/java/org/apache/kafka/raft/RaftConfig.java @@ -57,8 +57,8 @@ public class RaftConfig { public static final String QUORUM_VOTERS_CONFIG = QUORUM_PREFIX + "voters"; public static final String QUORUM_VOTERS_DOC = "Map of id/endpoint information for " + - "the set of voters in a comma-separated list of `{id}@{host}:{port}` entries. " + - "For example: `1@localhost:9092,2@localhost:9093,3@localhost:9094`"; + "the set of voters in a comma-separated list of {id}@{host}:{port} entries. " + + "For example: 1@localhost:9092,2@localhost:9093,3@localhost:9094"; public static final List DEFAULT_QUORUM_VOTERS = Collections.emptyList(); public static final String QUORUM_ELECTION_TIMEOUT_MS_CONFIG = QUORUM_PREFIX + "election.timeout.ms"; @@ -69,7 +69,7 @@ public class RaftConfig { public static final String QUORUM_FETCH_TIMEOUT_MS_CONFIG = QUORUM_PREFIX + "fetch.timeout.ms"; public static final String QUORUM_FETCH_TIMEOUT_MS_DOC = "Maximum time without a successful fetch from " + "the current leader before becoming a candidate and triggering an election for voters; Maximum time without " + - "receiving fetch from a majority of the quorum before asking around to see if there's a new epoch for leader"; + "receiving fetch from a majority of the quorum before asking around to see if there's a new epoch for leader."; public static final int DEFAULT_QUORUM_FETCH_TIMEOUT_MS = 2_000; public static final String QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG = QUORUM_PREFIX + "election.backoff.max.ms"; From c583743e2172db22aae2da9ed71647aed5609a90 Mon Sep 17 00:00:00 2001 From: Cheryl Simmons Date: Tue, 2 May 2023 13:59:49 -0700 Subject: [PATCH 6/9] removing newlines and putting in paragraphs --- .../clients/producer/ProducerConfig.java | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index ed68776fe820..542599d0c1d6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -282,19 +282,19 @@ public class ProducerConfig extends AbstractConfig { /** partitioner.class */ public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class"; private static final String PARTITIONER_CLASS_DOC = "Determines which partition to send a record to when records are produced. Available options are:" + - "

    " + - "
  • If not set, the default partitioning logic is used. " + - "This strategy send records to a partition until at least " + BATCH_SIZE_CONFIG + " bytes is produced to the partition. It works with the strategy: /n" + - "1) If no partition is specified but a key is present, choose a partition based on a hash of the key. /n" + - "2) If no partition or key is present, choose the sticky partition that changes when at least " + BATCH_SIZE_CONFIG + " bytes are produced to the partition./n" + - "
  • " + - "
  • org.apache.kafka.clients.producer.RoundRobinPartitioner: A partitioning strategy where " + - "each record in a series of consecutive records is sent to a different partition, regardless of whether the 'key' is provided or not, " + - "until partitions run out and the process starts over again. Note: There's a known issue that will cause uneven distribution when a new batch is created. " + - "See KAFKA-9965 for more detail." + - "
  • " + - "
" + - "

Implementing the org.apache.kafka.clients.producer.Partitioner interface allows you to plug in a custom partitioner."; + "

    " + + "
  • If not set, the default partitioning logic is used. " + + "This strategy send records to a partition until at least " + BATCH_SIZE_CONFIG + " bytes is produced to the partition. It works with the strategy:" + + "

    1) If no partition is specified but a key is present, choose a partition based on a hash of the key." + + "

    2) If no partition or key is present, choose the sticky partition that changes when at least " + BATCH_SIZE_CONFIG + " bytes are produced to the partition." + + "

  • " + + "
  • org.apache.kafka.clients.producer.RoundRobinPartitioner: A partitioning strategy where " + + "each record in a series of consecutive records is sent to a different partition, regardless of whether the 'key' is provided or not, " + + "until partitions run out and the process starts over again. Note: There's a known issue that will cause uneven distribution when a new batch is created. " + + "See KAFKA-9965 for more detail." + + "
  • " + + "
" + + "

Implementing the org.apache.kafka.clients.producer.Partitioner interface allows you to plug in a custom partitioner."; /** interceptor.classes */ public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes"; From e3fc768ea542f0df8925cfae2646f8924fb33c23 Mon Sep 17 00:00:00 2001 From: Cheryl Simmons Date: Tue, 2 May 2023 14:04:05 -0700 Subject: [PATCH 7/9] adding back code tick' --- core/src/main/scala/kafka/server/KafkaConfig.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 8e951715dcb1..9c81bbc88fa5 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -896,7 +896,7 @@ object KafkaConfig { "will cause consumers with older versions to break as they will receive messages with a format that they don't understand." val LogMessageTimestampTypeDoc = "Define whether the timestamp in the message is message create time or log append time. The value should be either " + - "`CreateTime` or `LogAppendTime." + "`CreateTime` or `LogAppendTime`." val LogMessageTimestampDifferenceMaxMsDoc = "The maximum difference allowed between the timestamp when a broker receives " + "a message and the timestamp specified in the message. If log.message.timestamp.type=CreateTime, a message will be rejected " + From 106b6d74cd8319c948e1974c72216d6e06b88d27 Mon Sep 17 00:00:00 2001 From: Cheryl Simmons Date: Tue, 2 May 2023 14:37:51 -0700 Subject: [PATCH 8/9] fixing indent --- .../clients/producer/ProducerConfig.java | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 542599d0c1d6..d6a419d38349 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -282,19 +282,19 @@ public class ProducerConfig extends AbstractConfig { /** partitioner.class */ public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class"; private static final String PARTITIONER_CLASS_DOC = "Determines which partition to send a record to when records are produced. Available options are:" + - "

    " + - "
  • If not set, the default partitioning logic is used. " + - "This strategy send records to a partition until at least " + BATCH_SIZE_CONFIG + " bytes is produced to the partition. It works with the strategy:" + - "

    1) If no partition is specified but a key is present, choose a partition based on a hash of the key." + - "

    2) If no partition or key is present, choose the sticky partition that changes when at least " + BATCH_SIZE_CONFIG + " bytes are produced to the partition." + - "

  • " + - "
  • org.apache.kafka.clients.producer.RoundRobinPartitioner: A partitioning strategy where " + - "each record in a series of consecutive records is sent to a different partition, regardless of whether the 'key' is provided or not, " + - "until partitions run out and the process starts over again. Note: There's a known issue that will cause uneven distribution when a new batch is created. " + - "See KAFKA-9965 for more detail." + - "
  • " + - "
" + - "

Implementing the org.apache.kafka.clients.producer.Partitioner interface allows you to plug in a custom partitioner."; + "

    " + + "
  • If not set, the default partitioning logic is used. " + + "This strategy send records to a partition until at least " + BATCH_SIZE_CONFIG + " bytes is produced to the partition. It works with the strategy:" + + "

    1) If no partition is specified but a key is present, choose a partition based on a hash of the key." + + "

    2) If no partition or key is present, choose the sticky partition that changes when at least " + BATCH_SIZE_CONFIG + " bytes are produced to the partition." + + "

  • " + + "
  • org.apache.kafka.clients.producer.RoundRobinPartitioner: A partitioning strategy where " + + "each record in a series of consecutive records is sent to a different partition, regardless of whether the 'key' is provided or not, " + + "until partitions run out and the process starts over again. Note: There's a known issue that will cause uneven distribution when a new batch is created. " + + "See KAFKA-9965 for more detail." + + "
  • " + + "
" + + "

Implementing the org.apache.kafka.clients.producer.Partitioner interface allows you to plug in a custom partitioner."; /** interceptor.classes */ public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes"; From 4b1435ed4136424ba73a621622ca6c8c5acc3080 Mon Sep 17 00:00:00 2001 From: Cheryl Simmons Date: Thu, 25 May 2023 17:49:33 -0700 Subject: [PATCH 9/9] ading code markup --- core/src/main/scala/kafka/server/KafkaConfig.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 9c81bbc88fa5..75cea1d27df7 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -801,13 +801,13 @@ object KafkaConfig { "Note that in KRaft a default mapping from the listener names defined by controller.listener.names to PLAINTEXT " + "is assumed if no explicit mapping is provided and no other security protocol is in use." val controlPlaneListenerNameDoc = "Name of listener used for communication between controller and brokers. " + - s"Broker will use the $ControlPlaneListenerNameProp to locate the endpoint in $ListenersProp list, to listen for connections from the controller. " + + s"A broker will use the $ControlPlaneListenerNameProp to locate the endpoint in $ListenersProp list, to listen for connections from the controller. " + "For example, if a broker's config is:\n" + "listeners = INTERNAL://192.1.1.8:9092, EXTERNAL://10.1.1.5:9093, CONTROLLER://192.1.1.8:9094" + "listener.security.protocol.map = INTERNAL:PLAINTEXT, EXTERNAL:SSL, CONTROLLER:SSL" + "control.plane.listener.name = CONTROLLER\n" + "On startup, the broker will start listening on \"192.1.1.8:9094\" with security protocol \"SSL\".\n" + - s"On the controller side, when it discovers a broker's published endpoints through ZooKeeper, it will use the $ControlPlaneListenerNameProp " + + s"On the controller side, when it discovers a broker's published endpoints through ZooKeeper, it will use the $ControlPlaneListenerNameProp " + "to find the endpoint, which it will use to establish connection to the broker.\n" + "For example, if the broker's published endpoints on ZooKeeper are:\n" + " \"endpoints\" : [\"INTERNAL://broker1.example.com:9092\",\"EXTERNAL://broker1.example.com:9093\",\"CONTROLLER://broker1.example.com:9094\"]\n" + @@ -816,7 +816,7 @@ object KafkaConfig { "control.plane.listener.name = CONTROLLER\n" + "then the controller will use \"broker1.example.com:9094\" with security protocol \"SSL\" to connect to the broker.\n" + "If not explicitly configured, the default value will be null and there will be no dedicated endpoints for controller connections.\n" + - s"If explicitly configured, the value cannot be the same as the value of $InterBrokerListenerNameProp." + s"If explicitly configured, the value cannot be the same as the value of $InterBrokerListenerNameProp." val SocketSendBufferBytesDoc = "The SO_SNDBUF buffer of the socket server sockets. If the value is -1, the OS default will be used." val SocketReceiveBufferBytesDoc = "The SO_RCVBUF buffer of the socket server sockets. If the value is -1, the OS default will be used."