From e605ccf0a197d9d031eeac73dfa7ba2ee09617f8 Mon Sep 17 00:00:00 2001 From: Brett Konold Date: Wed, 4 Mar 2020 21:54:49 -0800 Subject: [PATCH 1/6] Making default min.compaction.lag.ms configurable --- .../java/org/apache/samza/config/StorageConfig.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java index b5687c87dd..293781578a 100644 --- a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java @@ -165,6 +165,17 @@ private Optional getChangelogSystem() { return Optional.ofNullable(get(CHANGELOG_SYSTEM, get(JobConfig.JOB_DEFAULT_SYSTEM))); } + /** + * Gets the configured default for stores' changelog min.compaction.lag.ms, or if not defined uses the default + * value defined in this class. + * + * @return the default changelog min.compaction.lag.ms + */ + private long getDefaultChangelogMinCompactionLagMs() { + String defaultMinCompactLagConfigName = STORE_PREFIX + "default.changelog." + MIN_COMPACTION_LAG_MS; + return getLong(defaultMinCompactLagConfigName, DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS + } + /** * Gets the side inputs for the store. A store can have multiple side input streams which can be * provided as a comma separated list. @@ -226,7 +237,7 @@ public long getChangelogMinCompactionLagMs(String storeName) { checkArgument(get("stores." + storeName + ".changelog.kafka." + MIN_COMPACTION_LAG_MS) == null, "Use " + minCompactLagConfigName + " to set kafka min.compaction.lag.ms property."); - return getLong(minCompactLagConfigName, DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS); + return getLong(minCompactLagConfigName, getDefaultChangelogMinCompactionLagMs()); } /** From 5834237f20f98f29be3616544618dbbe4a4ce371 Mon Sep 17 00:00:00 2001 From: Brett Konold Date: Thu, 5 Mar 2020 17:12:02 -0800 Subject: [PATCH 2/6] Unit test for compact lag default --- .../apache/samza/config/StorageConfig.java | 2 +- .../samza/config/TestStorageConfig.java | 19 +++++++++++++------ 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java index 293781578a..a8b870268f 100644 --- a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java @@ -173,7 +173,7 @@ private Optional getChangelogSystem() { */ private long getDefaultChangelogMinCompactionLagMs() { String defaultMinCompactLagConfigName = STORE_PREFIX + "default.changelog." + MIN_COMPACTION_LAG_MS; - return getLong(defaultMinCompactLagConfigName, DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS + return getLong(defaultMinCompactLagConfigName, DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS); } /** diff --git a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java index baecf99063..88fbbe0a6c 100644 --- a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java +++ b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java @@ -20,7 +20,9 @@ package org.apache.samza.config; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -313,13 +315,18 @@ public void testGetDropLargeMessages() { @Test public void testGetChangelogMinCompactionLagMs() { // empty config, return default lag ms + Map configMap = new HashMap<>(); assertEquals(DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS, - new StorageConfig(new MapConfig()).getChangelogMinCompactionLagMs(STORE_NAME0)); + new StorageConfig(new MapConfig(configMap)).getChangelogMinCompactionLagMs(STORE_NAME0)); - long lagOverride = TimeUnit.HOURS.toMillis(6); - StorageConfig storageConfig = new StorageConfig( - new MapConfig(ImmutableMap.of(String.format(CHANGELOG_MIN_COMPACTION_LAG_MS, STORE_NAME0), - String.valueOf(lagOverride)))); - assertEquals(lagOverride, storageConfig.getChangelogMinCompactionLagMs(STORE_NAME0)); + // override with configured default + long defaultLagOverride = TimeUnit.HOURS.toMillis(8); + configMap.put(String.format(CHANGELOG_MIN_COMPACTION_LAG_MS, "default"), String.valueOf(defaultLagOverride)); + assertEquals(defaultLagOverride, new StorageConfig(new MapConfig(configMap)).getChangelogMinCompactionLagMs(STORE_NAME0)); + + // override for specific store + long storeSpecificLagOverride = TimeUnit.HOURS.toMillis(6); + configMap.put(String.format(CHANGELOG_MIN_COMPACTION_LAG_MS, STORE_NAME0), String.valueOf(storeSpecificLagOverride)); + assertEquals(storeSpecificLagOverride, new StorageConfig(new MapConfig(configMap)).getChangelogMinCompactionLagMs(STORE_NAME0)); } } From 62f6d18d5c3954d252a8b00594e538b3815d8691 Mon Sep 17 00:00:00 2001 From: Brett Konold Date: Mon, 16 Mar 2020 11:56:07 -0700 Subject: [PATCH 3/6] adding new configuration to docs --- .../versioned/jobs/configuration-table.html | 16 ++++++++++++++++ .../versioned/jobs/samza-configurations.md | 2 ++ 2 files changed, 18 insertions(+) diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html index 52047f95f5..0ebcf2c9d1 100644 --- a/docs/learn/documentation/versioned/jobs/configuration-table.html +++ b/docs/learn/documentation/versioned/jobs/configuration-table.html @@ -1585,6 +1585,22 @@

Samza Configuration Reference

+ + stores.default.changelog.min.compaction.lag.ms + 14400000 + + This property defines the default minimum period that must pass before a changelog message can be compacted. + + + + + stores.store-name.changelog.min.compaction.lag.ms + stores.default.changelog.min.compaction.lag.ms + + This property defines the minimum period that must pass before a message in the store's changelog can be compacted. + + + Consuming all Kafka topics matching a regular expression
diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md b/docs/learn/documentation/versioned/jobs/samza-configurations.md index 1a2e04bb8e..dedd1c4a04 100644 --- a/docs/learn/documentation/versioned/jobs/samza-configurations.md +++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md @@ -264,6 +264,8 @@ These properties define Samza's storage mechanism for efficient [stateful stream ##### [4.1 Advanced Storage Configurations](#advanced-storage-configurations) |Name|Default|Description| |--- |--- |--- | +|stores.default.changelog.
min.compaction.lag.ms|14400000|This property defines the default minimum period that must pass before a changelog message can be compacted.| +|stores.**_store-name_**.changelog.
min.compaction.lag.ms|stores.default.changelog.
min.compaction.lag.ms|This property defines the minimum period that must pass before a message in the store's changelog can be compacted.| |stores.default.changelog.
replication.factor|2|This property defines the default number of replicas to use for the change log stream.| |stores.**_store-name_**.changelog.
replication.factor|stores.default.changelog.
replication.factor|The property defines the number of replicas to use for the change log stream.| |stores.**_store-name_**.changelog.
kafka.topic-level-property| |The property allows you to specify topic level settings for the changelog topic to be created. For e.g., you can specify the clean up policy as "stores.mystore.changelog.cleanup.policy=delete". Please refer to the [Kafka documentation](http://kafka.apache.org/documentation.html#configuration) for more topic level configurations.| From 1ffcc6650909614d8a4016942cdcdf4cf0a25cd3 Mon Sep 17 00:00:00 2001 From: Brett Konold Date: Mon, 16 Mar 2020 14:30:43 -0700 Subject: [PATCH 4/6] adding details to documentation --- .../documentation/versioned/jobs/configuration-table.html | 4 ++++ .../documentation/versioned/jobs/samza-configurations.md | 4 ++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html index 0ebcf2c9d1..5fb20467cf 100644 --- a/docs/learn/documentation/versioned/jobs/configuration-table.html +++ b/docs/learn/documentation/versioned/jobs/configuration-table.html @@ -1590,6 +1590,8 @@

Samza Configuration Reference

14400000 This property defines the default minimum period that must pass before a changelog message can be compacted. + Be mindful that the larger this value, the larger in size changelog topics will be in Kafka due to un-compacted data and the longer your application's bootstrap time will be. + This may impact your application's startup time if host affinity is not enabled. For changelog topics which have "compact,delete" (time-retention) cleanup policy, this value should be < retention.ms. @@ -1598,6 +1600,8 @@

Samza Configuration Reference

stores.default.changelog.min.compaction.lag.ms This property defines the minimum period that must pass before a message in the store's changelog can be compacted. + Be mindful that the larger this value, the larger in size the changelog topic will be in Kafka due to un-compacted data and the longer your application's bootstrap time will be. + This may impact your application's startup time if host affinity is not enabled. For changelog topics which have "compact,delete" (time-retention) cleanup policy, this value should be < retention.ms. diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md b/docs/learn/documentation/versioned/jobs/samza-configurations.md index dedd1c4a04..cecfdc4a48 100644 --- a/docs/learn/documentation/versioned/jobs/samza-configurations.md +++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md @@ -264,8 +264,8 @@ These properties define Samza's storage mechanism for efficient [stateful stream ##### [4.1 Advanced Storage Configurations](#advanced-storage-configurations) |Name|Default|Description| |--- |--- |--- | -|stores.default.changelog.
min.compaction.lag.ms|14400000|This property defines the default minimum period that must pass before a changelog message can be compacted.| -|stores.**_store-name_**.changelog.
min.compaction.lag.ms|stores.default.changelog.
min.compaction.lag.ms|This property defines the minimum period that must pass before a message in the store's changelog can be compacted.| +|stores.default.changelog.
min.compaction.lag.ms|14400000|This property defines the default minimum period that must pass before a changelog message can be compacted. Be mindful that the larger this value, the larger in size changelog topics will be in Kafka due to un-compacted data and the longer your application's bootstrap time will be. This may impact your application's startup time if host affinity is not enabled. For changelog topics which have "compact,delete" (time-retention) cleanup policy, this value should be < retention.ms.| +|stores.**_store-name_**.changelog.
min.compaction.lag.ms|stores.default.changelog.
min.compaction.lag.ms|This property defines the minimum period that must pass before a message in the store's changelog can be compacted. Be mindful that the larger this value, the larger in size the changelog topic will be in Kafka due to un-compacted data and the longer your application's bootstrap time will be. This may impact your application's startup time if host affinity is not enabled. For changelog topics which have "compact,delete" (time-retention) cleanup policy, this value should be < retention.ms.| |stores.default.changelog.
replication.factor|2|This property defines the default number of replicas to use for the change log stream.| |stores.**_store-name_**.changelog.
replication.factor|stores.default.changelog.
replication.factor|The property defines the number of replicas to use for the change log stream.| |stores.**_store-name_**.changelog.
kafka.topic-level-property| |The property allows you to specify topic level settings for the changelog topic to be created. For e.g., you can specify the clean up policy as "stores.mystore.changelog.cleanup.policy=delete". Please refer to the [Kafka documentation](http://kafka.apache.org/documentation.html#configuration) for more topic level configurations.| From 286bfbc2fc2b3928aa329ed45b781f8f11f5d31e Mon Sep 17 00:00:00 2001 From: Brett Konold Date: Mon, 16 Mar 2020 14:33:54 -0700 Subject: [PATCH 5/6] fixing docs --- .../documentation/versioned/jobs/configuration-table.html | 4 ++-- .../documentation/versioned/jobs/samza-configurations.md | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html index 5fb20467cf..12d04f6bfd 100644 --- a/docs/learn/documentation/versioned/jobs/configuration-table.html +++ b/docs/learn/documentation/versioned/jobs/configuration-table.html @@ -1591,7 +1591,7 @@

Samza Configuration Reference

This property defines the default minimum period that must pass before a changelog message can be compacted. Be mindful that the larger this value, the larger in size changelog topics will be in Kafka due to un-compacted data and the longer your application's bootstrap time will be. - This may impact your application's startup time if host affinity is not enabled. For changelog topics which have "compact,delete" (time-retention) cleanup policy, this value should be < retention.ms. + This may impact your application's startup time if host affinity is not enabled. For changelog topics which have "compact,delete" cleanup policy, this value should be < retention.ms. @@ -1601,7 +1601,7 @@

Samza Configuration Reference

This property defines the minimum period that must pass before a message in the store's changelog can be compacted. Be mindful that the larger this value, the larger in size the changelog topic will be in Kafka due to un-compacted data and the longer your application's bootstrap time will be. - This may impact your application's startup time if host affinity is not enabled. For changelog topics which have "compact,delete" (time-retention) cleanup policy, this value should be < retention.ms. + This may impact your application's startup time if host affinity is not enabled. For changelog topics which have "compact,delete" cleanup policy, this value should be < retention.ms. diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md b/docs/learn/documentation/versioned/jobs/samza-configurations.md index cecfdc4a48..a579bc1d90 100644 --- a/docs/learn/documentation/versioned/jobs/samza-configurations.md +++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md @@ -264,8 +264,8 @@ These properties define Samza's storage mechanism for efficient [stateful stream ##### [4.1 Advanced Storage Configurations](#advanced-storage-configurations) |Name|Default|Description| |--- |--- |--- | -|stores.default.changelog.
min.compaction.lag.ms|14400000|This property defines the default minimum period that must pass before a changelog message can be compacted. Be mindful that the larger this value, the larger in size changelog topics will be in Kafka due to un-compacted data and the longer your application's bootstrap time will be. This may impact your application's startup time if host affinity is not enabled. For changelog topics which have "compact,delete" (time-retention) cleanup policy, this value should be < retention.ms.| -|stores.**_store-name_**.changelog.
min.compaction.lag.ms|stores.default.changelog.
min.compaction.lag.ms|This property defines the minimum period that must pass before a message in the store's changelog can be compacted. Be mindful that the larger this value, the larger in size the changelog topic will be in Kafka due to un-compacted data and the longer your application's bootstrap time will be. This may impact your application's startup time if host affinity is not enabled. For changelog topics which have "compact,delete" (time-retention) cleanup policy, this value should be < retention.ms.| +|stores.default.changelog.
min.compaction.lag.ms|14400000|This property defines the default minimum period that must pass before a changelog message can be compacted. Be mindful that the larger this value, the larger in size changelog topics will be in Kafka due to un-compacted data and the longer your application's bootstrap time will be. This may impact your application's startup time if host affinity is not enabled. For changelog topics which have "compact,delete" cleanup policy, this value should be < retention.ms.| +|stores.**_store-name_**.changelog.
min.compaction.lag.ms|stores.default.changelog.
min.compaction.lag.ms|This property defines the minimum period that must pass before a message in the store's changelog can be compacted. Be mindful that the larger this value, the larger in size the changelog topic will be in Kafka due to un-compacted data and the longer your application's bootstrap time will be. This may impact your application's startup time if host affinity is not enabled. For changelog topics which have "compact,delete" cleanup policy, this value should be < retention.ms.| |stores.default.changelog.
replication.factor|2|This property defines the default number of replicas to use for the change log stream.| |stores.**_store-name_**.changelog.
replication.factor|stores.default.changelog.
replication.factor|The property defines the number of replicas to use for the change log stream.| |stores.**_store-name_**.changelog.
kafka.topic-level-property| |The property allows you to specify topic level settings for the changelog topic to be created. For e.g., you can specify the clean up policy as "stores.mystore.changelog.cleanup.policy=delete". Please refer to the [Kafka documentation](http://kafka.apache.org/documentation.html#configuration) for more topic level configurations.| From c06791c17acf9025563d147a83f898b213464b61 Mon Sep 17 00:00:00 2001 From: Brett Konold Date: Mon, 16 Mar 2020 15:37:51 -0700 Subject: [PATCH 6/6] refining docs --- .../documentation/versioned/jobs/configuration-table.html | 4 ++-- .../documentation/versioned/jobs/samza-configurations.md | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html index 12d04f6bfd..b588e641fc 100644 --- a/docs/learn/documentation/versioned/jobs/configuration-table.html +++ b/docs/learn/documentation/versioned/jobs/configuration-table.html @@ -1590,7 +1590,7 @@

Samza Configuration Reference

14400000 This property defines the default minimum period that must pass before a changelog message can be compacted. - Be mindful that the larger this value, the larger in size changelog topics will be in Kafka due to un-compacted data and the longer your application's bootstrap time will be. + Be mindful that the larger this value, the larger in size changelog topics will be in Kafka due to un-compacted data and the longer your application's time to restore from changelog will be. This may impact your application's startup time if host affinity is not enabled. For changelog topics which have "compact,delete" cleanup policy, this value should be < retention.ms. @@ -1600,7 +1600,7 @@

Samza Configuration Reference

stores.default.changelog.min.compaction.lag.ms This property defines the minimum period that must pass before a message in the store's changelog can be compacted. - Be mindful that the larger this value, the larger in size the changelog topic will be in Kafka due to un-compacted data and the longer your application's bootstrap time will be. + Be mindful that the larger this value, the larger in size the changelog topic will be in Kafka due to un-compacted data and the longer your application's time to restore from changelog will be. This may impact your application's startup time if host affinity is not enabled. For changelog topics which have "compact,delete" cleanup policy, this value should be < retention.ms. diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md b/docs/learn/documentation/versioned/jobs/samza-configurations.md index a579bc1d90..baf1ea8ed3 100644 --- a/docs/learn/documentation/versioned/jobs/samza-configurations.md +++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md @@ -264,8 +264,8 @@ These properties define Samza's storage mechanism for efficient [stateful stream ##### [4.1 Advanced Storage Configurations](#advanced-storage-configurations) |Name|Default|Description| |--- |--- |--- | -|stores.default.changelog.
min.compaction.lag.ms|14400000|This property defines the default minimum period that must pass before a changelog message can be compacted. Be mindful that the larger this value, the larger in size changelog topics will be in Kafka due to un-compacted data and the longer your application's bootstrap time will be. This may impact your application's startup time if host affinity is not enabled. For changelog topics which have "compact,delete" cleanup policy, this value should be < retention.ms.| -|stores.**_store-name_**.changelog.
min.compaction.lag.ms|stores.default.changelog.
min.compaction.lag.ms|This property defines the minimum period that must pass before a message in the store's changelog can be compacted. Be mindful that the larger this value, the larger in size the changelog topic will be in Kafka due to un-compacted data and the longer your application's bootstrap time will be. This may impact your application's startup time if host affinity is not enabled. For changelog topics which have "compact,delete" cleanup policy, this value should be < retention.ms.| +|stores.default.changelog.
min.compaction.lag.ms|14400000|This property defines the default minimum period that must pass before a changelog message can be compacted. Be mindful that the larger this value, the larger in size changelog topics will be in Kafka due to un-compacted data and the longer your application's time to restore from changelog will be. This may impact your application's startup time if host affinity is not enabled. For changelog topics which have "compact,delete" cleanup policy, this value should be < retention.ms.| +|stores.**_store-name_**.changelog.
min.compaction.lag.ms|stores.default.changelog.
min.compaction.lag.ms|This property defines the minimum period that must pass before a message in the store's changelog can be compacted. Be mindful that the larger this value, the larger in size the changelog topic will be in Kafka due to un-compacted data and the longer your application's time to restore from changelog will be. This may impact your application's startup time if host affinity is not enabled. For changelog topics which have "compact,delete" cleanup policy, this value should be < retention.ms.| |stores.default.changelog.
replication.factor|2|This property defines the default number of replicas to use for the change log stream.| |stores.**_store-name_**.changelog.
replication.factor|stores.default.changelog.
replication.factor|The property defines the number of replicas to use for the change log stream.| |stores.**_store-name_**.changelog.
kafka.topic-level-property| |The property allows you to specify topic level settings for the changelog topic to be created. For e.g., you can specify the clean up policy as "stores.mystore.changelog.cleanup.policy=delete". Please refer to the [Kafka documentation](http://kafka.apache.org/documentation.html#configuration) for more topic level configurations.|