Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions docs/learn/documentation/versioned/jobs/configuration-table.html
Original file line number Diff line number Diff line change
Expand Up @@ -1585,6 +1585,26 @@ <h1>Samza Configuration Reference</h1>
</td>
</tr>

<tr>
<td class="property" id="store-default-changelog-min-compaction-lag-ms">stores.default.changelog.min.compaction.lag.ms</td>
<td class="default">14400000</td>
<td class="description">
This property defines the default minimum period that must pass before a changelog message can be compacted.
Be mindful that the larger this value, the larger in size changelog topics will be in Kafka due to un-compacted data and the longer your application's time to restore from changelog will be.
This may impact your application's startup time if host affinity is not enabled. For changelog topics which have "compact,delete" cleanup policy, this value should be < retention.ms.
</td>
</tr>

<tr>
<td class="property" id="store-changelog-min-compaction-lag-ms">stores.<span class="store">store-name</span>.changelog.min.compaction.lag.ms</td>
<td class="default">stores.default.changelog.min.compaction.lag.ms</td>
<td class="description">
This property defines the minimum period that must pass before a message in the store's changelog can be compacted.
Comment thread
bkonold marked this conversation as resolved.
Be mindful that the larger this value, the larger in size the changelog topic will be in Kafka due to un-compacted data and the longer your application's time to restore from changelog will be.
This may impact your application's startup time if host affinity is not enabled. For changelog topics which have "compact,delete" cleanup policy, this value should be < retention.ms.
</td>
</tr>

<tr>
<th colspan="3" class="section" id="regex-rewriter">
Consuming all Kafka topics matching a regular expression<br>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ These properties define Samza's storage mechanism for efficient [stateful stream
##### <a name="advanced-storage-configurations"></a>[4.1 Advanced Storage Configurations](#advanced-storage-configurations)
|Name|Default|Description|
|--- |--- |--- |
|stores.default.changelog.<br>min.compaction.lag.ms|14400000|This property defines the default minimum period that must pass before a changelog message can be compacted. Be mindful that the larger this value, the larger in size changelog topics will be in Kafka due to un-compacted data and the longer your application's time to restore from changelog will be. This may impact your application's startup time if host affinity is not enabled. For changelog topics which have "compact,delete" cleanup policy, this value should be < retention.ms.|
|stores.**_store-name_**.changelog.<br>min.compaction.lag.ms|stores.default.changelog.<br>min.compaction.lag.ms|This property defines the minimum period that must pass before a message in the store's changelog can be compacted. Be mindful that the larger this value, the larger in size the changelog topic will be in Kafka due to un-compacted data and the longer your application's time to restore from changelog will be. This may impact your application's startup time if host affinity is not enabled. For changelog topics which have "compact,delete" cleanup policy, this value should be < retention.ms.|
|stores.default.changelog.<br>replication.factor|2|This property defines the default number of replicas to use for the change log stream.|
|stores.**_store-name_**.changelog.<br>replication.factor|stores.default.changelog.<br>replication.factor|The property defines the number of replicas to use for the change log stream.|
|stores.**_store-name_**.changelog.<br>kafka.topic-level-property| |The property allows you to specify topic level settings for the changelog topic to be created. For e.g., you can specify the clean up policy as "stores.mystore.changelog.cleanup.policy=delete". Please refer to the [Kafka documentation](http://kafka.apache.org/documentation.html#configuration) for more topic level configurations.|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,17 @@ private Optional<String> getChangelogSystem() {
return Optional.ofNullable(get(CHANGELOG_SYSTEM, get(JobConfig.JOB_DEFAULT_SYSTEM)));
}

/**
* Gets the configured default for stores' changelog min.compaction.lag.ms, or if not defined uses the default
* value defined in this class.
*
* @return the default changelog min.compaction.lag.ms
*/
private long getDefaultChangelogMinCompactionLagMs() {
String defaultMinCompactLagConfigName = STORE_PREFIX + "default.changelog." + MIN_COMPACTION_LAG_MS;
return getLong(defaultMinCompactLagConfigName, DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS);
}

/**
* Gets the side inputs for the store. A store can have multiple side input streams which can be
* provided as a comma separated list.
Expand Down Expand Up @@ -226,7 +237,7 @@ public long getChangelogMinCompactionLagMs(String storeName) {
checkArgument(get("stores." + storeName + ".changelog.kafka." + MIN_COMPACTION_LAG_MS) == null,
"Use " + minCompactLagConfigName + " to set kafka min.compaction.lag.ms property.");

return getLong(minCompactLagConfigName, DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS);
return getLong(minCompactLagConfigName, getDefaultChangelogMinCompactionLagMs());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
package org.apache.samza.config;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -313,13 +315,18 @@ public void testGetDropLargeMessages() {
@Test
public void testGetChangelogMinCompactionLagMs() {
// empty config, return default lag ms
Map<String, String> configMap = new HashMap<>();
assertEquals(DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS,
new StorageConfig(new MapConfig()).getChangelogMinCompactionLagMs(STORE_NAME0));
new StorageConfig(new MapConfig(configMap)).getChangelogMinCompactionLagMs(STORE_NAME0));

long lagOverride = TimeUnit.HOURS.toMillis(6);
StorageConfig storageConfig = new StorageConfig(
new MapConfig(ImmutableMap.of(String.format(CHANGELOG_MIN_COMPACTION_LAG_MS, STORE_NAME0),
String.valueOf(lagOverride))));
assertEquals(lagOverride, storageConfig.getChangelogMinCompactionLagMs(STORE_NAME0));
// override with configured default
long defaultLagOverride = TimeUnit.HOURS.toMillis(8);
configMap.put(String.format(CHANGELOG_MIN_COMPACTION_LAG_MS, "default"), String.valueOf(defaultLagOverride));
assertEquals(defaultLagOverride, new StorageConfig(new MapConfig(configMap)).getChangelogMinCompactionLagMs(STORE_NAME0));

// override for specific store
long storeSpecificLagOverride = TimeUnit.HOURS.toMillis(6);
configMap.put(String.format(CHANGELOG_MIN_COMPACTION_LAG_MS, STORE_NAME0), String.valueOf(storeSpecificLagOverride));
assertEquals(storeSpecificLagOverride, new StorageConfig(new MapConfig(configMap)).getChangelogMinCompactionLagMs(STORE_NAME0));
}
}