-
Notifications
You must be signed in to change notification settings - Fork 14.5k
KAFKA-19080 The constraint on segment.ms is not enforced at topic level #19371
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Hello @junrao, @chia7712 I think there are two possible approaches to resolve this:
|
@m1a2st : Perhaps we could somehow allow the tests to set a small |
Maybe we can add a internal config to allow tests to define small size? |
for example: this.internalSegmentSize = getString(TopicConfig.INTERNAL_SEGMENT_BYTES_CONFIG);
...
public long segmentSize() {
if (internalSegmentSize != null) return Long.parseLong(internalSegmentSize);
return segmentMs;
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@m1a2st : Thanks for the updated PR. A couple of more comments.
props.setProperty(TopicConfig.SEGMENT_MS_CONFIG, config.logSegmentMillis.toString) | ||
// Since MetadataLogConfig validates the log segment size using a minimum allowed value, | ||
// we can safely use `internal.segment.bytes` to configure it instead of relying on `segment.bytes`. | ||
props.setProperty(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, config.logSegmentBytes.toString) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be confusing. If someone turns on the debug logging to see the config for the metadata topic, it will be surprising to see INTERNAL_SEGMENT_BYTES_CONFIG being set, even though it's not set by the users.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need this change now, as the internal config for kraft topic is removed. Additionally, the min size of kraft topic segment (8MB) is larger than size of regular segment (1MB). Hence, it is pretty safe to set props.setProperty(TopicConfig.SEGMENT_MS_CONFIG, config.logSegmentMillis.toString)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, does that work? This will fail LogConfig.validate(props)
if a test passes in a small log segment value, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will fail LogConfig.validate(props) if a test passes in a small log segment value, right?
This won't happen because this PR removes support for configuring small segments for Kraft topics. The minimum Kraft topic size is now 8MB, which is large enough to pass LogConfig.validate(props).
We've removed the configuration directly to streamline the PR, as there are no existing tests in code base for small segment Kraft topics.
If configuring small segments for kraft topic becomes necessary, we will need to reintroduce broker-level internal log configuration, as topic-level configuration is not available for kraft topic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, we have tests using the following constructor to pass in a small segmentBytes value.
public MetadataLogConfig(int internalLogSegmentBytes,
long logSegmentMillis,
long retentionMaxBytes,
long retentionMillis,
int maxBatchSizeInBytes,
int maxFetchSizeInBytes,
long deleteDelayMillis) {
This will fail the validation in the following if it's passed in as TopicConfig.SEGMENT_BYTES_CONFIG, right?
LogConfig.validate(props)
val defaultLogConfig = new LogConfig(props)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@junrao You're right. To support smaller segments for Kraft topics, we could reintroduce the broker-level internal.log.segment.bytes
configuration. However, this duplicates the functionality of the topic-level internal.segment.bytes
and would affect all topics on the server.
Perhaps we should revisit internal.metadata.log.segment.bytes
instead. The regular topics could utilize topic-level internal.segment.bytes
for smaller segments, while Kraft topics would use the broker-level internal.metadata.log.segment.bytes
. This approach would require adding if-else logic when converting Kraft configurations to log configurations.
props.setProperty(if config.useInternal LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG else LogConfig.SEGMENT_BYTES_CONFIG, config.logSegmentBytes.toString)
For another, we should move the test-only constructor out of the production code. This will prevent it from being used accidentally in a production environment. This could be considered as a follow-up ticket
public MetadataLogConfig(int internalLogSegmentBytes,
long logSegmentMillis,
long retentionMaxBytes,
long retentionMillis,
int maxBatchSizeInBytes,
int maxFetchSizeInBytes,
long deleteDelayMillis) {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For another, we should move the test-only constructor out of the production code. This will prevent it from being used accidentally in a production environment. This could be considered as a follow-up ticket
We should probably address this in this PR. Otherwise, the MetadataLogConfig
constructor that takes internalLogSegmentBytes
will seem a bit strange in the production code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe we shouldn’t move this constructor outside of MetadataLogConfig
, because the public MetadataLogConfig(AbstractConfig config)
constructor doesn’t allow us to set properties like maxBatchSizeInBytes
, maxFetchSizeInBytes
, or deleteDelayMillis
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maxBatchSizeInBytes, maxFetchSizeInBytes, or deleteDelayMillis.
Maybe it is time to introduce internal configs for them? That make MetadataLogConfig
consistent with other config classes which allow to configure all variables.
@@ -446,6 +447,14 @@ public static List<String> configNames() { | |||
return CONFIG.names().stream().sorted().toList(); | |||
} | |||
|
|||
public static List<String> excludeInternalConfigNames() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
excludeInternalConfigNames => nonInternalConfigNames ?
@m1a2st please fix the conflicts |
# Conflicts: # core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@m1a2st : Thanks for the updated PR. A few more comments.
logSegmentMillis: Long, | ||
retentionMaxBytes: Long, | ||
retentionMillis: Long, | ||
maxBatchSizeInBytes: Int, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should these three be named internal? Ditto in other test.
props.setProperty(TopicConfig.SEGMENT_BYTES_CONFIG, config.logSegmentBytes.toString) | ||
props.setProperty(TopicConfig.SEGMENT_MS_CONFIG, config.logSegmentMillis.toString) | ||
if (config.internalSegmentBytes() != null) | ||
props.setProperty(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, config.logSegmentBytes.toString) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, config.logSegmentBytes
should be config.internalLogSegmentBytes
, right?
this.maxBatchSizeInBytes = KafkaRaftClient.MAX_BATCH_SIZE_BYTES; | ||
this.maxFetchSizeInBytes = KafkaRaftClient.MAX_FETCH_SIZE_BYTES; | ||
this.deleteDelayMillis = ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT; | ||
this.maxBatchSizeInBytes = Objects.requireNonNullElse(config.getInt(INTERNAL_MAX_BATCH_SIZE_IN_BYTES_CONFIG), KafkaRaftClient.MAX_BATCH_SIZE_BYTES); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we name maxBatchSizeInBytes
and the other two with the internal prefix? Also, should we just use KafkaRaftClient.MAX_BATCH_SIZE_BYTES as the default in the config def?
} | ||
|
||
public int logSegmentBytes() { | ||
return logSegmentBytes; | ||
return Objects.requireNonNullElse(internalSegmentBytes, logSegmentBytes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, it seems it's more intuitive to just return logSegmentBytes
since we have a separate public api to return internalSegmentBytes
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there are two styles:
public int logSegmentBytes() {
return Objects.requireNonNullElse(internalSegmentBytes, logSegmentBytes);
}
public boolean useInternalSegmentBytes() {
return internalSegmentBytes != null;
}
if (config.useInternalSegmentBytes())
props.setProperty(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, config.logSegmentBytes.toString)
else
props.setProperty(TopicConfig.SEGMENT_BYTES_CONFIG, config.logSegmentBytes.toString)
public int logSegmentBytes() {
return logSegmentBytes;
}
public Integer internalSegmentBytes() {
return internalSegmentBytes;
}
if (config.internalSegmentBytes() != null)
props.setProperty(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, config.internalSegmentBytes.toString)
else
props.setProperty(TopicConfig.SEGMENT_BYTES_CONFIG, config.logSegmentBytes.toString)
I prefer the later since useInternalSegmentBytes
is not usual in config classes
# Conflicts: # core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@m1a2st : Thanks for the updated PR. The code LGTM. Are the test failures related?
No, I think the fail tes are not related, but the main reason is the HashSet order is different, I will file a PR to fix it.
Update: https://issues.apache.org/jira/browse/KAFKA-19326 |
I merge trunk, and retrigger the CI again. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@m1a2st thanks for this patch. a couple of comments are left
internalMaxFetchSizeInBytes: Int, | ||
internalDeleteDelayMillis: Long | ||
): MetadataLogConfig = { | ||
val config: util.Map[String, Any] = util.Map.of( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val config = util.Map.of(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to explicitly declare entries as Any
, otherwise the compiler will infer it as long
, which would cause errors when parsing the config.
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
Outdated
Show resolved
Hide resolved
@@ -248,7 +247,7 @@ public void consumerConfigMustContainStreamPartitionAssignorConfig() { | |||
props.put(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, 99_999L); | |||
props.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, 7L); | |||
props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "dummy:host"); | |||
props.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG), 100); | |||
props.put(StreamsConfig.topicPrefix("internal.segment.bytes"), 100); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is used to test the updated configs get returned, so we can increase the configured value instead of using internal config.
storage/src/test/java/org/apache/kafka/storage/internals/log/LocalLogTest.java
Outdated
Show resolved
Hide resolved
core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
Outdated
Show resolved
Hide resolved
@m1a2st could you please tweak your PR description to summarize the changes? |
Thanks for reminder, updated it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public static final String INTERNAL_MAX_BATCH_SIZE_IN_BYTES_CONFIG = "internal.max.batch.size.in.bytes"; | ||
public static final String INTERNAL_MAX_BATCH_SIZE_IN_BYTES_DOC = "The largest record batch size allowed in the metadata log, only for testing."; | ||
|
||
public static final String INTERNAL_MAX_FETCH_SIZE_IN_BYTES_CONFIG = "internal.max.fetch.size.in.bytes"; | ||
public static final String INTERNAL_MAX_FETCH_SIZE_IN_BYTES_DOC = "The maximum number of bytes to read when fetching from the metadata log, only for testing."; | ||
|
||
public static final String INTERNAL_DELETE_DELAY_MILLIS_CONFIG = "internal.delete.delay.millis"; | ||
public static final String INTERNAL_DELETE_DELAY_MILLIS_DOC = "The amount of time to wait before deleting a file from the filesystem, only for testing."; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't these configs have "metadata" in there somewhere in the name? From what I can tell, these are only used by KafkaMetadataLog.
@@ -599,11 +601,7 @@ object KafkaMetadataLog extends Logging { | |||
LogConfig.validate(props) | |||
val defaultLogConfig = new LogConfig(props) | |||
|
|||
if (config.logSegmentBytes < config.logSegmentMinBytes) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, this logic has been replaced by the config validation system.
SEGMENT_BYTES_CONFIG has minimum of 1 MB but can be overridden by INTERNAL_SEGMENT_BYTES_CONFIG which has no minimum.
@@ -639,12 +637,6 @@ object KafkaMetadataLog extends Logging { | |||
nodeId | |||
) | |||
|
|||
// Print a warning if users have overridden the internal config | |||
if (config.logSegmentMinBytes != KafkaRaftClient.MAX_BATCH_SIZE_BYTES) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we keep the error message if INTERNAL_SEGMENT_BYTES_CONFIG has been set? This was added to help operators avoid setting these testing-only configs in a production environment.
public int logSegmentMinBytes() { | ||
return logSegmentMinBytes; | ||
|
||
public Integer internalSegmentBytes() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OptionalInt
might be nicer here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since config.getInt(INTERNAL_METADATA_LOG_SEGMENT_BYTES_CONFIG)
may return a null value, using OptionalInt
is not suitable here.
If we want to wrap the result with Optional
, I think Optional<Integer>
would be more appropriate.
@mumrah thanks for your reviews. We will file a minor later |
I’ve addressed the follow-up comments on #19842 |
See Discussion: #19371 (comment) Do the following changes: - Update the internal config name with metadata prefix - add the warning message for setting `INTERNAL_METADATA_LOG_SEGMENT_BYTES_CONFIG` Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
@@ -588,9 +588,11 @@ object KafkaMetadataLog extends Logging { | |||
nodeId: Int | |||
): KafkaMetadataLog = { | |||
val props = new Properties() | |||
props.setProperty(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, config.maxBatchSizeInBytes.toString) | |||
props.setProperty(TopicConfig.SEGMENT_BYTES_CONFIG, config.logSegmentBytes.toString) | |||
props.setProperty(TopicConfig.SEGMENT_MS_CONFIG, config.logSegmentMillis.toString) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a critical bug - open https://issues.apache.org/jira/browse/KAFKA-19392 to fix it
The original `props.setProperty(TopicConfig.SEGMENT_MS_CONFIG, config.logSegmentMillis.toString)` in the `KafkaMetadataLog` constructor was accidentally removed in #19371. Add a test to ensure this property is properly assigned. Reviewers: Ken Huang <s7133700@gmail.com>, TengYao Chi <kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
The main issue was that we forgot to set
TopicConfig.SEGMENT_BYTES_CONFIG
to at least1024 * 1024
, whichcaused problems in tests with small segment sizes.
To address this, we introduced a new internal config:
LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG
, allowing us to set smallersegment bytes specifically for testing purposes.
We also updated the logic so that if a user configures the topic-level
segment bytes without explicitly setting the internal config, the
internal value will no longer be returned to the user.
In addition, we removed
MetadataLogConfig#METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG
and addedthree new internal configurations:
INTERNAL_MAX_BATCH_SIZE_IN_BYTES_CONFIG
INTERNAL_MAX_FETCH_SIZE_IN_BYTES_CONFIG
INTERNAL_DELETE_DELAY_MILLIS_CONFIG
Reviewers: Jun Rao junrao@gmail.com, Chia-Ping Tsai
chia7712@gmail.com