diff --git a/atomix/cluster/src/main/java/io/atomix/raft/partition/RaftPartitionGroup.java b/atomix/cluster/src/main/java/io/atomix/raft/partition/RaftPartitionGroup.java index ee45a2cfe70d..ac91d18c7635 100644 --- a/atomix/cluster/src/main/java/io/atomix/raft/partition/RaftPartitionGroup.java +++ b/atomix/cluster/src/main/java/io/atomix/raft/partition/RaftPartitionGroup.java @@ -492,6 +492,22 @@ public Builder withPartitionDistributor(final PartitionDistributor partitionDist return this; } + /** + * Sets the threshold for preferring snapshot replication. The unit is number of records + * by which a follower may lag behind before the leader starts to prefer replicating snapshots + * instead of records. + * + * @param preferSnapshotReplicationThreshold the threshold to use + * @return this builder for chaining + */ + public Builder withPreferSnapshotReplicationThreshold( + final int preferSnapshotReplicationThreshold) { + config + .getPartitionConfig() + .setPreferSnapshotReplicationThreshold(preferSnapshotReplicationThreshold); + return this; + } + @Override public RaftPartitionGroup build() { return new RaftPartitionGroup(config); diff --git a/broker/src/main/java/io/camunda/zeebe/broker/partitioning/RaftPartitionGroupFactory.java b/broker/src/main/java/io/camunda/zeebe/broker/partitioning/RaftPartitionGroupFactory.java index 7066b3e9b4dc..7e2d37d33e67 100644 --- a/broker/src/main/java/io/camunda/zeebe/broker/partitioning/RaftPartitionGroupFactory.java +++ b/broker/src/main/java/io/camunda/zeebe/broker/partitioning/RaftPartitionGroupFactory.java @@ -75,7 +75,9 @@ RaftPartitionGroup buildRaftPartitionGroup( .withHeartbeatInterval(clusterCfg.getHeartbeatInterval()) .withRequestTimeout(experimentalCfg.getRaft().getRequestTimeout()) .withMaxQuorumResponseTimeout(experimentalCfg.getRaft().getMaxQuorumResponseTimeout()) - .withMinStepDownFailureCount(experimentalCfg.getRaft().getMinStepDownFailureCount()); + .withMinStepDownFailureCount(experimentalCfg.getRaft().getMinStepDownFailureCount()) + .withPreferSnapshotReplicationThreshold( + experimentalCfg.getRaft().getPreferSnapshotReplicationThreshold()); final int maxMessageSize = (int) networkCfg.getMaxMessageSizeInBytes(); diff --git a/broker/src/main/java/io/camunda/zeebe/broker/system/configuration/ExperimentalRaftCfg.java b/broker/src/main/java/io/camunda/zeebe/broker/system/configuration/ExperimentalRaftCfg.java index caa84603d302..cca657cbe27f 100644 --- a/broker/src/main/java/io/camunda/zeebe/broker/system/configuration/ExperimentalRaftCfg.java +++ b/broker/src/main/java/io/camunda/zeebe/broker/system/configuration/ExperimentalRaftCfg.java @@ -14,10 +14,12 @@ public final class ExperimentalRaftCfg implements ConfigurationEntry { private static final Duration DEFAULT_REQUEST_TIMEOUT = Duration.ofSeconds(5); private static final Duration DEFAULT_MAX_QUORUM_RESPONSE_TIMEOUT = Duration.ofSeconds(0); private static final int DEFAULT_MIN_STEP_DOWN_FAILURE_COUNT = 3; + private static final int DEFAULT_PREFER_SNAPSHOT_REPLICATION_THRESHOLD = 100; private Duration requestTimeout = DEFAULT_REQUEST_TIMEOUT; private Duration maxQuorumResponseTimeout = DEFAULT_MAX_QUORUM_RESPONSE_TIMEOUT; private int minStepDownFailureCount = DEFAULT_MIN_STEP_DOWN_FAILURE_COUNT; + private int preferSnapshotReplicationThreshold = DEFAULT_PREFER_SNAPSHOT_REPLICATION_THRESHOLD; public Duration getRequestTimeout() { return requestTimeout; @@ -42,4 +44,12 @@ public int getMinStepDownFailureCount() { public void setMinStepDownFailureCount(final int minStepDownFailureCount) { this.minStepDownFailureCount = minStepDownFailureCount; } + + public int getPreferSnapshotReplicationThreshold() { + return preferSnapshotReplicationThreshold; + } + + public void setPreferSnapshotReplicationThreshold(final int preferSnapshotReplicationThreshold) { + this.preferSnapshotReplicationThreshold = preferSnapshotReplicationThreshold; + } } diff --git a/broker/src/test/java/io/camunda/zeebe/broker/partitioning/RaftPartitionGroupFactoryTest.java b/broker/src/test/java/io/camunda/zeebe/broker/partitioning/RaftPartitionGroupFactoryTest.java index 39f68e8b3da0..dd459b5e6326 100644 --- a/broker/src/test/java/io/camunda/zeebe/broker/partitioning/RaftPartitionGroupFactoryTest.java +++ b/broker/src/test/java/io/camunda/zeebe/broker/partitioning/RaftPartitionGroupFactoryTest.java @@ -141,6 +141,18 @@ void shouldDisablePriorityElection() { assertThat(config.getPartitionConfig().isPriorityElectionEnabled()).isFalse(); } + @Test + void shouldSetPreferSnapshotReplicationThreshold() { + // given + brokerCfg.getExperimental().getRaft().setPreferSnapshotReplicationThreshold(1000); + + // when + final var config = buildRaftPartitionGroup(); + + // then + assertThat(config.getPartitionConfig().getPreferSnapshotReplicationThreshold()).isEqualTo(1000); + } + private RaftPartitionGroupConfig buildRaftPartitionGroup() { final var partitionGroup = factory.buildRaftPartitionGroup(brokerCfg, SNAPSHOT_STORE_FACTORY); return (RaftPartitionGroupConfig) partitionGroup.config(); diff --git a/broker/src/test/java/io/camunda/zeebe/broker/system/configuration/ExperimentalCfgTest.java b/broker/src/test/java/io/camunda/zeebe/broker/system/configuration/ExperimentalCfgTest.java index ede46ac3d8e5..b9aac61fb1ba 100644 --- a/broker/src/test/java/io/camunda/zeebe/broker/system/configuration/ExperimentalCfgTest.java +++ b/broker/src/test/java/io/camunda/zeebe/broker/system/configuration/ExperimentalCfgTest.java @@ -86,4 +86,27 @@ public void shouldSetRaftMinStepDownFailureCountFromEnv() { // then assertThat(raft.getMinStepDownFailureCount()).isEqualTo(10); } + + @Test + public void shouldSetPreferSnapshotReplicationThresholdFromConfig() { + // when + final BrokerCfg cfg = TestConfigReader.readConfig("experimental-cfg", environment); + final var raft = cfg.getExperimental().getRaft(); + + // then + assertThat(raft.getPreferSnapshotReplicationThreshold()).isEqualTo(500); + } + + @Test + public void shouldSetPreferSnapshotReplicationThresholdFromEnv() { + // given + environment.put("zeebe.broker.experimental.raft.preferSnapshotReplicationThreshold", "10"); + + // when + final BrokerCfg cfg = TestConfigReader.readConfig("experimental-cfg", environment); + final var raft = cfg.getExperimental().getRaft(); + + // then + assertThat(raft.getPreferSnapshotReplicationThreshold()).isEqualTo(10); + } } diff --git a/broker/src/test/resources/system/experimental-cfg.yaml b/broker/src/test/resources/system/experimental-cfg.yaml index fed1cb0482b1..22b717059371 100644 --- a/broker/src/test/resources/system/experimental-cfg.yaml +++ b/broker/src/test/resources/system/experimental-cfg.yaml @@ -6,5 +6,6 @@ zeebe: requestTimeout: 10s maxQuorumResponseTimeout: 8s minStepDownFailureCount: 5 + preferSnapshotReplicationThreshold: 500 queryApi: enabled: true diff --git a/dist/src/main/config/broker.standalone.yaml.template b/dist/src/main/config/broker.standalone.yaml.template index 1bfca99f8a9b..8f74c05a03d7 100644 --- a/dist/src/main/config/broker.standalone.yaml.template +++ b/dist/src/main/config/broker.standalone.yaml.template @@ -620,6 +620,12 @@ # This setting can also be overridden using the environment variable ZEEBE_BROKER_EXPERIMENTAL_RAFT_MAXQUORUMRESPONSETIMEOUT # maxQuorumResponseTimeout = 0ms + # Threshold used by the leader to decide between replicating a snapshot or records. + # The unit is number of records by which the follower may lag behind before the leader + # prefers replicating snapshots instead of records. + # This setting can also be overridden using the environment variable ZEEBE_BROKER_EXPERIMENTAL_RAFT_PREFERSNAPSHOTREPLICATIONTHRESHOLD. + # preferSnapshotReplicationThreshold = 100 + # Allows to configure RocksDB properties, which is used for state management. # rocksdb: # Specify custom column family options overwriting Zeebe's own defaults. diff --git a/dist/src/main/config/broker.yaml.template b/dist/src/main/config/broker.yaml.template index 4845b3636547..00471bd3cf85 100644 --- a/dist/src/main/config/broker.yaml.template +++ b/dist/src/main/config/broker.yaml.template @@ -558,6 +558,12 @@ # This setting can also be overridden using the environment variable ZEEBE_BROKER_EXPERIMENTAL_RAFT_MAXQUORUMRESPONSETIMEOUT # maxQuorumResponseTimeout = 0ms + # Threshold used by the leader to decide between replicating a snapshot or records. + # The unit is number of records by which the follower may lag behind before the leader + # prefers replicating snapshots instead of records. + # This setting can also be overridden using the environment variable ZEEBE_BROKER_EXPERIMENTAL_RAFT_PREFERSNAPSHOTREPLICATIONTHRESHOLD. + # preferSnapshotReplicationThreshold = 100 + # Allows to configure RocksDB properties, which is used for state management. # rocksdb: # Specify custom column family options overwriting Zeebe's own defaults.