Skip to content

Commit

Permalink
Add subscriber-pool-size to Ditto distributed data config.
Browse files Browse the repository at this point in the history
Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Aug 1, 2022
1 parent 5b9d747 commit 555400c
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 17 deletions.
Expand Up @@ -34,17 +34,19 @@ public final class DefaultDistributedDataConfig implements DistributedDataConfig
private final Duration subscriptionDelay;
private final AkkaReplicatorConfig akkaReplicatorConfig;
private final int numberOfShards;
private final int subscriberPoolSize;

private DefaultDistributedDataConfig(final Config configWithFallback) {
readTimeout = configWithFallback.getDuration(DistributedDataConfigValue.READ_TIMEOUT.getConfigPath());
writeTimeout = configWithFallback.getDuration(DistributedDataConfigValue.WRITE_TIMEOUT.getConfigPath());
akkaReplicatorConfig = DefaultAkkaReplicatorConfig.of(configWithFallback);
subscriptionWriteConsistency = toWriteConsistency(configWithFallback.getString(
DistributedDataConfigValue.SUBSCRIPTION_WRITE_CONSISTENCY.getConfigPath()),
DistributedDataConfigValue.SUBSCRIPTION_WRITE_CONSISTENCY.getConfigPath()),
configWithFallback.getDuration(DistributedDataConfigValue.WRITE_TIMEOUT.getConfigPath()));
subscriptionDelay =
configWithFallback.getDuration(DistributedDataConfigValue.SUBSCRIPTION_DELAY.getConfigPath());
numberOfShards = configWithFallback.getInt(DistributedDataConfigValue.NUMBER_OF_SHARDS.getConfigPath());
subscriberPoolSize = configWithFallback.getInt(DistributedDataConfigValue.SUBSCRIBER_POOL_SIZE.getConfigPath());
}

private DefaultDistributedDataConfig(final Config configWithFallback,
Expand All @@ -54,11 +56,12 @@ private DefaultDistributedDataConfig(final Config configWithFallback,
writeTimeout = configWithFallback.getDuration(DistributedDataConfigValue.WRITE_TIMEOUT.getConfigPath());
akkaReplicatorConfig = DefaultAkkaReplicatorConfig.of(configWithFallback, replicatorName, replicatorRole);
subscriptionWriteConsistency = toWriteConsistency(configWithFallback.getString(
DistributedDataConfigValue.SUBSCRIPTION_WRITE_CONSISTENCY.getConfigPath()),
DistributedDataConfigValue.SUBSCRIPTION_WRITE_CONSISTENCY.getConfigPath()),
configWithFallback.getDuration(DistributedDataConfigValue.WRITE_TIMEOUT.getConfigPath()));
subscriptionDelay =
configWithFallback.getDuration(DistributedDataConfigValue.SUBSCRIPTION_DELAY.getConfigPath());
numberOfShards = configWithFallback.getInt(DistributedDataConfigValue.NUMBER_OF_SHARDS.getConfigPath());
subscriberPoolSize = configWithFallback.getInt(DistributedDataConfigValue.SUBSCRIBER_POOL_SIZE.getConfigPath());
}

/**
Expand Down Expand Up @@ -122,6 +125,11 @@ public int getNumberOfShards() {
return numberOfShards;
}

@Override
public int getSubscriberPoolSize() {
return subscriberPoolSize;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand All @@ -135,13 +143,14 @@ public boolean equals(final Object o) {
Objects.equals(writeTimeout, that.writeTimeout) &&
Objects.equals(subscriptionWriteConsistency, that.subscriptionWriteConsistency) &&
Objects.equals(subscriptionDelay, that.subscriptionDelay) &&
Objects.equals(akkaReplicatorConfig, that.akkaReplicatorConfig);
Objects.equals(akkaReplicatorConfig, that.akkaReplicatorConfig) &&
subscriberPoolSize == that.subscriberPoolSize;
}

@Override
public int hashCode() {
return Objects.hash(readTimeout, writeTimeout, akkaReplicatorConfig, subscriptionWriteConsistency,
subscriptionDelay, numberOfShards);
subscriptionDelay, numberOfShards, subscriberPoolSize);
}

@Override
Expand All @@ -153,23 +162,17 @@ public String toString() {
", subscriptionDelay" + subscriptionDelay +
", akkaReplicatorConfig=" + akkaReplicatorConfig +
", numberOfShards=" + numberOfShards +
", subscriberPoolSize=" + subscriberPoolSize +
"]";
}

private static Replicator.WriteConsistency toWriteConsistency(final String configuredWriteConsistency,
final Duration writeTimeout) {
final Object writeConsistency;
switch (configuredWriteConsistency) {
case "local":
writeConsistency = Replicator.writeLocal();
break;
case "majority":
writeConsistency = new Replicator.WriteMajority(writeTimeout);
break;
default: // default is "all"
writeConsistency = new Replicator.WriteAll(writeTimeout);
}
return (Replicator.WriteConsistency) writeConsistency;
return switch (configuredWriteConsistency) {
case "local" -> (Replicator.WriteConsistency) Replicator.writeLocal();
case "majority" -> new Replicator.WriteMajority(writeTimeout);
default -> new Replicator.WriteAll(writeTimeout); // default is "all"
};
}

}
Expand Up @@ -68,6 +68,13 @@ public interface DistributedDataConfig {
*/
int getNumberOfShards();

/**
* The number of subscribers in each subscriber pool.
*
* @return the number of subscribers.
*/
int getSubscriberPoolSize();

/**
* An enumeration of the known config path expressions and their associated default values for
* {@code DistributedDataConfig}.
Expand Down Expand Up @@ -97,7 +104,12 @@ enum DistributedDataConfigValue implements KnownConfigValue {
/**
* The number of shards Ditto's ddata extension applies for Map keys.
*/
NUMBER_OF_SHARDS("number-of-shards", 5);
NUMBER_OF_SHARDS("number-of-shards", 5),

/**
* The number of subscribers in each subscriber pool.
*/
SUBSCRIBER_POOL_SIZE("subscriber-pool-size", 1);

private final String path;
private final Object defaultValue;
Expand Down
5 changes: 5 additions & 0 deletions internal/utils/ddata/src/main/resources/reference.conf
Expand Up @@ -56,5 +56,10 @@ ditto {
# The sharding is done based on a hash of the Address or ActorRef of the Subscriber of a service instance.
number-of-shards = 5
number-of-shards = ${?DITTO_DDATA_NUMBER_OF_SHARDS}

subscriber-pool-size = 1
subscriber-pool-size = ${?akka.remote.artery.advanced.inbound-lanes}
subscriber-pool-size = ${?REMOTE_INBOUND_LANES}
subscriber-pool-size = ${?DITTO_DDATA_SUBSCRIBER_POOL_SIZE}
}
}
Expand Up @@ -73,6 +73,8 @@ public void underTestReturnsDefaultValuesIfBaseConfigWasEmpty() {
.isEqualTo(new Replicator.WriteAll(underTest.getWriteTimeout()));
softly.assertThat(underTest.getSubscriptionDelay())
.isEqualTo(DistributedDataConfig.DistributedDataConfigValue.SUBSCRIPTION_DELAY.getDefaultValue());
softly.assertThat(underTest.getSubscriberPoolSize())
.isEqualTo(DistributedDataConfig.DistributedDataConfigValue.SUBSCRIBER_POOL_SIZE.getDefaultValue());
}

@Test
Expand All @@ -89,6 +91,8 @@ public void underTestReturnsValuesOfConfigFile() {
.isEqualTo(new Replicator.WriteMajority(Duration.ofSeconds(1337)));
softly.assertThat(underTest.getSubscriptionDelay())
.isEqualTo(Duration.ofDays(1L));
softly.assertThat(underTest.getSubscriberPoolSize())
.isEqualTo(99);
}

@Test
Expand All @@ -108,6 +112,8 @@ public void underTestReturnsValuesOfConfigFileWithOverwrites() {
.isEqualTo(new Replicator.WriteMajority(Duration.ofSeconds(1337)));
softly.assertThat(underTest.getSubscriptionDelay())
.isEqualTo(Duration.ofDays(1L));
softly.assertThat(underTest.getSubscriberPoolSize())
.isEqualTo(99);
}

}
2 changes: 2 additions & 0 deletions internal/utils/ddata/src/test/resources/ditto-ddata-test.conf
Expand Up @@ -10,4 +10,6 @@ ddata {
write-timeout = 1337s
subscription-write-consistency = "majority"
subscription-delay = 1d

subscriber-pool-size = 99
}

0 comments on commit 555400c

Please sign in to comment.