Skip to content

KAFKA-19080 The constraint on segment.ms is not enforced at topic level #19371

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 98 commits into from
May 25, 2025

Conversation

m1a2st
Copy link
Collaborator

@m1a2st m1a2st commented Apr 4, 2025

The main issue was that we forgot to set
TopicConfig.SEGMENT_BYTES_CONFIG to at least 1024 * 1024, which
caused problems in tests with small segment sizes.

To address this, we introduced a new internal config:
LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, allowing us to set smaller
segment bytes specifically for testing purposes.

We also updated the logic so that if a user configures the topic-level
segment bytes without explicitly setting the internal config, the
internal value will no longer be returned to the user.

In addition, we removed
MetadataLogConfig#METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG and added
three new internal configurations:

  • INTERNAL_MAX_BATCH_SIZE_IN_BYTES_CONFIG
  • INTERNAL_MAX_FETCH_SIZE_IN_BYTES_CONFIG
  • INTERNAL_DELETE_DELAY_MILLIS_CONFIG

Reviewers: Jun Rao junrao@gmail.com, Chia-Ping Tsai
chia7712@gmail.com

@github-actions github-actions bot added triage PRs from the community tools storage Pull requests that target the storage module small Small PRs labels Apr 4, 2025
@m1a2st m1a2st changed the title KAFKA-19080 The constraint on segment.ms is not enforced at topic level [WIP]KAFKA-19080 The constraint on segment.ms is not enforced at topic level Apr 4, 2025
@github-actions github-actions bot added the core Kafka Broker label Apr 4, 2025
@m1a2st
Copy link
Collaborator Author

m1a2st commented Apr 4, 2025

Hello @junrao, @chia7712
I have a question about this problem.
In KAFKA-16368, only the ServerLogConfigs value for log.segment.bytes was modified to 1 MB. However, the TopicConfig which in LogConfig default for segment.bytes remains 14 bytes. Many tests rely on this small segment.bytes value to generate a large number of segments.

I think there are two possible approaches to resolve this:

  1. Add atLeast(1024 * 1024) validation in LogConfig, which would require fixing around 400 tests.
  2. Add a validator in LogManager#updateTopicConfig to validate the change request at runtime.

@junrao
Copy link
Contributor

junrao commented Apr 9, 2025

@m1a2st : Perhaps we could somehow allow the tests to set a small segment.bytes.

@github-actions github-actions bot removed the small Small PRs label Apr 9, 2025
@chia7712
Copy link
Member

chia7712 commented Apr 9, 2025

Maybe we can add a internal config to allow tests to define small size?

@chia7712
Copy link
Member

for example:

   this.internalSegmentSize = getString(TopicConfig.INTERNAL_SEGMENT_BYTES_CONFIG);

   ...
   
    public long segmentSize() {
        if (internalSegmentSize != null) return Long.parseLong(internalSegmentSize);
        return segmentMs;
    }

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@m1a2st : Thanks for the updated PR. A couple of more comments.

props.setProperty(TopicConfig.SEGMENT_MS_CONFIG, config.logSegmentMillis.toString)
// Since MetadataLogConfig validates the log segment size using a minimum allowed value,
// we can safely use `internal.segment.bytes` to configure it instead of relying on `segment.bytes`.
props.setProperty(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, config.logSegmentBytes.toString)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be confusing. If someone turns on the debug logging to see the config for the metadata topic, it will be surprising to see INTERNAL_SEGMENT_BYTES_CONFIG being set, even though it's not set by the users.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this change now, as the internal config for kraft topic is removed. Additionally, the min size of kraft topic segment (8MB) is larger than size of regular segment (1MB). Hence, it is pretty safe to set props.setProperty(TopicConfig.SEGMENT_MS_CONFIG, config.logSegmentMillis.toString)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, does that work? This will fail LogConfig.validate(props) if a test passes in a small log segment value, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will fail LogConfig.validate(props) if a test passes in a small log segment value, right?

This won't happen because this PR removes support for configuring small segments for Kraft topics. The minimum Kraft topic size is now 8MB, which is large enough to pass LogConfig.validate(props).

We've removed the configuration directly to streamline the PR, as there are no existing tests in code base for small segment Kraft topics.

If configuring small segments for kraft topic becomes necessary, we will need to reintroduce broker-level internal log configuration, as topic-level configuration is not available for kraft topic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, we have tests using the following constructor to pass in a small segmentBytes value.

    public MetadataLogConfig(int internalLogSegmentBytes,
                             long logSegmentMillis,
                             long retentionMaxBytes,
                             long retentionMillis,
                             int maxBatchSizeInBytes,
                             int maxFetchSizeInBytes,
                             long deleteDelayMillis) {

This will fail the validation in the following if it's passed in as TopicConfig.SEGMENT_BYTES_CONFIG, right?

    LogConfig.validate(props)
    val defaultLogConfig = new LogConfig(props)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao You're right. To support smaller segments for Kraft topics, we could reintroduce the broker-level internal.log.segment.bytes configuration. However, this duplicates the functionality of the topic-level internal.segment.bytes and would affect all topics on the server.

Perhaps we should revisit internal.metadata.log.segment.bytes instead. The regular topics could utilize topic-level internal.segment.bytes for smaller segments, while Kraft topics would use the broker-level internal.metadata.log.segment.bytes. This approach would require adding if-else logic when converting Kraft configurations to log configurations.

props.setProperty(if config.useInternal LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG else LogConfig.SEGMENT_BYTES_CONFIG,  config.logSegmentBytes.toString)

For another, we should move the test-only constructor out of the production code. This will prevent it from being used accidentally in a production environment. This could be considered as a follow-up ticket

public MetadataLogConfig(int internalLogSegmentBytes,
                             long logSegmentMillis,
                             long retentionMaxBytes,
                             long retentionMillis,
                             int maxBatchSizeInBytes,
                             int maxFetchSizeInBytes,
                             long deleteDelayMillis) {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For another, we should move the test-only constructor out of the production code. This will prevent it from being used accidentally in a production environment. This could be considered as a follow-up ticket

We should probably address this in this PR. Otherwise, the MetadataLogConfig constructor that takes internalLogSegmentBytes will seem a bit strange in the production code.

see https://github.com/apache/kafka/blob/trunk/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java#L285 as example

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we shouldn’t move this constructor outside of MetadataLogConfig, because the public MetadataLogConfig(AbstractConfig config) constructor doesn’t allow us to set properties like maxBatchSizeInBytes, maxFetchSizeInBytes, or deleteDelayMillis.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maxBatchSizeInBytes, maxFetchSizeInBytes, or deleteDelayMillis.

Maybe it is time to introduce internal configs for them? That make MetadataLogConfig consistent with other config classes which allow to configure all variables.

@@ -446,6 +447,14 @@ public static List<String> configNames() {
return CONFIG.names().stream().sorted().toList();
}

public static List<String> excludeInternalConfigNames() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

excludeInternalConfigNames => nonInternalConfigNames ?

@chia7712
Copy link
Member

@m1a2st please fix the conflicts

m1a2st added 4 commits May 22, 2025 19:30
# Conflicts:
#	core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@m1a2st : Thanks for the updated PR. A few more comments.

logSegmentMillis: Long,
retentionMaxBytes: Long,
retentionMillis: Long,
maxBatchSizeInBytes: Int,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these three be named internal? Ditto in other test.

props.setProperty(TopicConfig.SEGMENT_BYTES_CONFIG, config.logSegmentBytes.toString)
props.setProperty(TopicConfig.SEGMENT_MS_CONFIG, config.logSegmentMillis.toString)
if (config.internalSegmentBytes() != null)
props.setProperty(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, config.logSegmentBytes.toString)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, config.logSegmentBytes should be config.internalLogSegmentBytes, right?

this.maxBatchSizeInBytes = KafkaRaftClient.MAX_BATCH_SIZE_BYTES;
this.maxFetchSizeInBytes = KafkaRaftClient.MAX_FETCH_SIZE_BYTES;
this.deleteDelayMillis = ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT;
this.maxBatchSizeInBytes = Objects.requireNonNullElse(config.getInt(INTERNAL_MAX_BATCH_SIZE_IN_BYTES_CONFIG), KafkaRaftClient.MAX_BATCH_SIZE_BYTES);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we name maxBatchSizeInBytes and the other two with the internal prefix? Also, should we just use KafkaRaftClient.MAX_BATCH_SIZE_BYTES as the default in the config def?

}

public int logSegmentBytes() {
return logSegmentBytes;
return Objects.requireNonNullElse(internalSegmentBytes, logSegmentBytes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, it seems it's more intuitive to just return logSegmentBytes since we have a separate public api to return internalSegmentBytes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are two styles:

    public int logSegmentBytes() {
        return Objects.requireNonNullElse(internalSegmentBytes, logSegmentBytes);
    }
    
    public boolean useInternalSegmentBytes() {
        return internalSegmentBytes != null;
    }
    if (config.useInternalSegmentBytes())
      props.setProperty(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, config.logSegmentBytes.toString)
    else
      props.setProperty(TopicConfig.SEGMENT_BYTES_CONFIG, config.logSegmentBytes.toString)
    public int logSegmentBytes() {
        return logSegmentBytes;
    }
    
    public Integer internalSegmentBytes() {
        return internalSegmentBytes;
    }
    if (config.internalSegmentBytes() != null)
      props.setProperty(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, config.internalSegmentBytes.toString)
    else
      props.setProperty(TopicConfig.SEGMENT_BYTES_CONFIG, config.logSegmentBytes.toString)

I prefer the later since useInternalSegmentBytes is not usual in config classes

m1a2st added 3 commits May 23, 2025 20:07
# Conflicts:
#	core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@m1a2st : Thanks for the updated PR. The code LGTM. Are the test failures related?

@m1a2st
Copy link
Collaborator Author

m1a2st commented May 23, 2025

No, I think the fail tes are not related, but the main reason is the HashSet order is different, I will file a PR to fix it.

org.opentest4j.AssertionFailedError: expected: <[CoordinatorRecord(key=ShareGroupStatePartitionMetadataKey(groupId='groupId'), value=ApiMessageAndVersion(ShareGroupStatePartitionMetadataValue(initializingTopics=[], initializedTopics=[TopicPartitionsInfo(topicId=wO0yz3DfTaCC9fnfn8maXw, topicName='t1', partitions=[1, 0])], deletingTopics=[]) at version 0))]> but was: <[CoordinatorRecord(key=ShareGroupStatePartitionMetadataKey(groupId='groupId'), value=ApiMessageAndVersion(ShareGroupStatePartitionMetadataValue(initializingTopics=[], initializedTopics=[TopicPartitionsInfo(topicId=wO0yz3DfTaCC9fnfn8maXw, topicName='t1', partitions=[0, 1])], deletingTopics=[]) at version 0))]>
    at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
    at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
    at app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
    at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182)
    at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177)
    at app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1145)
    at app//org.apache.kafka.coordinator.group.GroupMetadataManagerTest.testShareGroupInitializeSuccess(GroupMetadataManagerTest.java:21717)
    at java.base@21.0.4/java.lang.reflect.Method.invoke(Method.java:580)
    at java.base@21.0.4/java.util.ArrayList.forEach(ArrayList.java:1596)
    at java.base@21.0.4/java.util.ArrayList.forEach(ArrayList.java:1596)

Update: https://issues.apache.org/jira/browse/KAFKA-19326
It already fix by af4d048

@m1a2st
Copy link
Collaborator Author

m1a2st commented May 23, 2025

I merge trunk, and retrigger the CI again.

Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@m1a2st thanks for this patch. a couple of comments are left

internalMaxFetchSizeInBytes: Int,
internalDeleteDelayMillis: Long
): MetadataLogConfig = {
val config: util.Map[String, Any] = util.Map.of(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val config = util.Map.of(

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to explicitly declare entries as Any, otherwise the compiler will infer it as long, which would cause errors when parsing the config.

@@ -248,7 +247,7 @@ public void consumerConfigMustContainStreamPartitionAssignorConfig() {
props.put(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, 99_999L);
props.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, 7L);
props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "dummy:host");
props.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG), 100);
props.put(StreamsConfig.topicPrefix("internal.segment.bytes"), 100);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is used to test the updated configs get returned, so we can increase the configured value instead of using internal config.

@chia7712
Copy link
Member

@m1a2st could you please tweak your PR description to summarize the changes?

@m1a2st
Copy link
Collaborator Author

m1a2st commented May 25, 2025

Thanks for reminder, updated it.

@chia7712
Copy link
Member

@junrao @m1a2st Thanks for all your patient reviews and updates. I'm going to merge this. Please feel free to leave any further comments for follow-up.

@chia7712 chia7712 merged commit bcda92b into apache:trunk May 25, 2025
24 checks passed
Copy link
Member

@mumrah mumrah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@m1a2st / @chia7712 -- some follow-up comments

Comment on lines +81 to +89
public static final String INTERNAL_MAX_BATCH_SIZE_IN_BYTES_CONFIG = "internal.max.batch.size.in.bytes";
public static final String INTERNAL_MAX_BATCH_SIZE_IN_BYTES_DOC = "The largest record batch size allowed in the metadata log, only for testing.";

public static final String INTERNAL_MAX_FETCH_SIZE_IN_BYTES_CONFIG = "internal.max.fetch.size.in.bytes";
public static final String INTERNAL_MAX_FETCH_SIZE_IN_BYTES_DOC = "The maximum number of bytes to read when fetching from the metadata log, only for testing.";

public static final String INTERNAL_DELETE_DELAY_MILLIS_CONFIG = "internal.delete.delay.millis";
public static final String INTERNAL_DELETE_DELAY_MILLIS_DOC = "The amount of time to wait before deleting a file from the filesystem, only for testing.";

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't these configs have "metadata" in there somewhere in the name? From what I can tell, these are only used by KafkaMetadataLog.

@@ -599,11 +601,7 @@ object KafkaMetadataLog extends Logging {
LogConfig.validate(props)
val defaultLogConfig = new LogConfig(props)

if (config.logSegmentBytes < config.logSegmentMinBytes) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, this logic has been replaced by the config validation system.

SEGMENT_BYTES_CONFIG has minimum of 1 MB but can be overridden by INTERNAL_SEGMENT_BYTES_CONFIG which has no minimum.

@@ -639,12 +637,6 @@ object KafkaMetadataLog extends Logging {
nodeId
)

// Print a warning if users have overridden the internal config
if (config.logSegmentMinBytes != KafkaRaftClient.MAX_BATCH_SIZE_BYTES) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we keep the error message if INTERNAL_SEGMENT_BYTES_CONFIG has been set? This was added to help operators avoid setting these testing-only configs in a production environment.

public int logSegmentMinBytes() {
return logSegmentMinBytes;

public Integer internalSegmentBytes() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OptionalInt might be nicer here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since config.getInt(INTERNAL_METADATA_LOG_SEGMENT_BYTES_CONFIG) may return a null value, using OptionalInt is not suitable here.
If we want to wrap the result with Optional, I think Optional<Integer> would be more appropriate.

@chia7712
Copy link
Member

@mumrah thanks for your reviews. We will file a minor later

@m1a2st
Copy link
Collaborator Author

m1a2st commented May 29, 2025

I’ve addressed the follow-up comments on #19842

chia7712 pushed a commit that referenced this pull request Jun 8, 2025
See Discussion:
#19371 (comment)

Do the following changes:
- Update the internal config name with metadata prefix
- add the warning message for setting
`INTERNAL_METADATA_LOG_SEGMENT_BYTES_CONFIG`

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
@@ -588,9 +588,11 @@ object KafkaMetadataLog extends Logging {
nodeId: Int
): KafkaMetadataLog = {
val props = new Properties()
props.setProperty(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, config.maxBatchSizeInBytes.toString)
props.setProperty(TopicConfig.SEGMENT_BYTES_CONFIG, config.logSegmentBytes.toString)
props.setProperty(TopicConfig.SEGMENT_MS_CONFIG, config.logSegmentMillis.toString)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a critical bug - open https://issues.apache.org/jira/browse/KAFKA-19392 to fix it

chia7712 pushed a commit that referenced this pull request Jun 10, 2025
The original `props.setProperty(TopicConfig.SEGMENT_MS_CONFIG,
config.logSegmentMillis.toString)` in the `KafkaMetadataLog` constructor
was accidentally removed in #19371. Add a test to ensure this property
is properly  assigned.

Reviewers: Ken Huang <s7133700@gmail.com>, TengYao Chi
 <kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
build Gradle build or GitHub Actions clients connect core Kafka Broker KIP-932 Queues for Kafka kraft storage Pull requests that target the storage module streams tools
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants