diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html index 52047f95f5..b588e641fc 100644 --- a/docs/learn/documentation/versioned/jobs/configuration-table.html +++ b/docs/learn/documentation/versioned/jobs/configuration-table.html @@ -1585,6 +1585,26 @@

Samza Configuration Reference

+ + stores.default.changelog.min.compaction.lag.ms + 14400000 + + This property defines the default minimum period that must pass before a changelog message can be compacted. + Be mindful that the larger this value, the larger in size changelog topics will be in Kafka due to un-compacted data and the longer your application's time to restore from changelog will be. + This may impact your application's startup time if host affinity is not enabled. For changelog topics which have "compact,delete" cleanup policy, this value should be < retention.ms. + + + + + stores.store-name.changelog.min.compaction.lag.ms + stores.default.changelog.min.compaction.lag.ms + + This property defines the minimum period that must pass before a message in the store's changelog can be compacted. + Be mindful that the larger this value, the larger in size the changelog topic will be in Kafka due to un-compacted data and the longer your application's time to restore from changelog will be. + This may impact your application's startup time if host affinity is not enabled. For changelog topics which have "compact,delete" cleanup policy, this value should be < retention.ms. + + + Consuming all Kafka topics matching a regular expression
diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md b/docs/learn/documentation/versioned/jobs/samza-configurations.md index 1a2e04bb8e..baf1ea8ed3 100644 --- a/docs/learn/documentation/versioned/jobs/samza-configurations.md +++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md @@ -264,6 +264,8 @@ These properties define Samza's storage mechanism for efficient [stateful stream ##### [4.1 Advanced Storage Configurations](#advanced-storage-configurations) |Name|Default|Description| |--- |--- |--- | +|stores.default.changelog.
min.compaction.lag.ms|14400000|This property defines the default minimum period that must pass before a changelog message can be compacted. Be mindful that the larger this value, the larger in size changelog topics will be in Kafka due to un-compacted data and the longer your application's time to restore from changelog will be. This may impact your application's startup time if host affinity is not enabled. For changelog topics which have "compact,delete" cleanup policy, this value should be < retention.ms.| +|stores.**_store-name_**.changelog.
min.compaction.lag.ms|stores.default.changelog.
min.compaction.lag.ms|This property defines the minimum period that must pass before a message in the store's changelog can be compacted. Be mindful that the larger this value, the larger in size the changelog topic will be in Kafka due to un-compacted data and the longer your application's time to restore from changelog will be. This may impact your application's startup time if host affinity is not enabled. For changelog topics which have "compact,delete" cleanup policy, this value should be < retention.ms.| |stores.default.changelog.
replication.factor|2|This property defines the default number of replicas to use for the change log stream.| |stores.**_store-name_**.changelog.
replication.factor|stores.default.changelog.
replication.factor|The property defines the number of replicas to use for the change log stream.| |stores.**_store-name_**.changelog.
kafka.topic-level-property| |The property allows you to specify topic level settings for the changelog topic to be created. For e.g., you can specify the clean up policy as "stores.mystore.changelog.cleanup.policy=delete". Please refer to the [Kafka documentation](http://kafka.apache.org/documentation.html#configuration) for more topic level configurations.| diff --git a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java index b5687c87dd..a8b870268f 100644 --- a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java @@ -165,6 +165,17 @@ private Optional getChangelogSystem() { return Optional.ofNullable(get(CHANGELOG_SYSTEM, get(JobConfig.JOB_DEFAULT_SYSTEM))); } + /** + * Gets the configured default for stores' changelog min.compaction.lag.ms, or if not defined uses the default + * value defined in this class. + * + * @return the default changelog min.compaction.lag.ms + */ + private long getDefaultChangelogMinCompactionLagMs() { + String defaultMinCompactLagConfigName = STORE_PREFIX + "default.changelog." + MIN_COMPACTION_LAG_MS; + return getLong(defaultMinCompactLagConfigName, DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS); + } + /** * Gets the side inputs for the store. A store can have multiple side input streams which can be * provided as a comma separated list. @@ -226,7 +237,7 @@ public long getChangelogMinCompactionLagMs(String storeName) { checkArgument(get("stores." + storeName + ".changelog.kafka." + MIN_COMPACTION_LAG_MS) == null, "Use " + minCompactLagConfigName + " to set kafka min.compaction.lag.ms property."); - return getLong(minCompactLagConfigName, DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS); + return getLong(minCompactLagConfigName, getDefaultChangelogMinCompactionLagMs()); } /** diff --git a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java index baecf99063..88fbbe0a6c 100644 --- a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java +++ b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java @@ -20,7 +20,9 @@ package org.apache.samza.config; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -313,13 +315,18 @@ public void testGetDropLargeMessages() { @Test public void testGetChangelogMinCompactionLagMs() { // empty config, return default lag ms + Map configMap = new HashMap<>(); assertEquals(DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS, - new StorageConfig(new MapConfig()).getChangelogMinCompactionLagMs(STORE_NAME0)); + new StorageConfig(new MapConfig(configMap)).getChangelogMinCompactionLagMs(STORE_NAME0)); - long lagOverride = TimeUnit.HOURS.toMillis(6); - StorageConfig storageConfig = new StorageConfig( - new MapConfig(ImmutableMap.of(String.format(CHANGELOG_MIN_COMPACTION_LAG_MS, STORE_NAME0), - String.valueOf(lagOverride)))); - assertEquals(lagOverride, storageConfig.getChangelogMinCompactionLagMs(STORE_NAME0)); + // override with configured default + long defaultLagOverride = TimeUnit.HOURS.toMillis(8); + configMap.put(String.format(CHANGELOG_MIN_COMPACTION_LAG_MS, "default"), String.valueOf(defaultLagOverride)); + assertEquals(defaultLagOverride, new StorageConfig(new MapConfig(configMap)).getChangelogMinCompactionLagMs(STORE_NAME0)); + + // override for specific store + long storeSpecificLagOverride = TimeUnit.HOURS.toMillis(6); + configMap.put(String.format(CHANGELOG_MIN_COMPACTION_LAG_MS, STORE_NAME0), String.valueOf(storeSpecificLagOverride)); + assertEquals(storeSpecificLagOverride, new StorageConfig(new MapConfig(configMap)).getChangelogMinCompactionLagMs(STORE_NAME0)); } }