Skip to content

Commit

Permalink
Add max retries and retry delay options to primitive/protocol configu…
Browse files Browse the repository at this point in the history
…rations.
  • Loading branch information
kuujo committed Dec 6, 2017
1 parent 100efcd commit 2d4ad14
Show file tree
Hide file tree
Showing 44 changed files with 503 additions and 268 deletions.
Expand Up @@ -86,6 +86,8 @@ private PrimitiveProtocol newRaftProtocol(Consistency readConsistency) {
.withMinTimeout(Duration.ofSeconds(5))
.withMaxTimeout(Duration.ofSeconds(30))
.withReadConsistency(readConsistency == Consistency.LINEARIZABLE ? ReadConsistency.LINEARIZABLE : ReadConsistency.SEQUENTIAL)
.withMaxRetries(maxRetries())
.withRetryDelay(retryDelay())
.build();
}

Expand All @@ -94,6 +96,8 @@ private PrimitiveProtocol newMultiPrimaryProtocol(Consistency consistency, Repli
.withConsistency(consistency)
.withReplication(replication)
.withBackups(backups())
.withMaxRetries(maxRetries())
.withRetryDelay(retryDelay())
.build();
}
}
Expand Up @@ -43,8 +43,7 @@ public CompletableFuture<AtomicCounter> buildAsync() {
.getPartitionGroup(protocol)
.getPartition(name())
.getPrimitiveClient()
.proxyBuilder(name(), primitiveType(), protocol)
.build()
.newProxy(name(), primitiveType(), protocol)
.open()
.thenApply(proxy -> new AtomicCounterProxy(proxy).sync());
}
Expand Down
Expand Up @@ -134,6 +134,8 @@ private PrimitiveProtocol newRaftProtocol(Consistency readConsistency) {
.withMinTimeout(electionTimeout)
.withMaxTimeout(Duration.ofSeconds(5))
.withReadConsistency(readConsistency == Consistency.LINEARIZABLE ? ReadConsistency.LINEARIZABLE : ReadConsistency.SEQUENTIAL)
.withMaxRetries(maxRetries())
.withRetryDelay(retryDelay())
.build();
}
}
Expand Up @@ -134,6 +134,8 @@ private PrimitiveProtocol newRaftProtocol(Consistency readConsistency) {
.withMinTimeout(electionTimeout)
.withMaxTimeout(Duration.ofSeconds(5))
.withReadConsistency(readConsistency == Consistency.LINEARIZABLE ? ReadConsistency.LINEARIZABLE : ReadConsistency.SEQUENTIAL)
.withMaxRetries(maxRetries())
.withRetryDelay(retryDelay())
.build();
}
}
Expand Up @@ -43,8 +43,7 @@ public CompletableFuture<LeaderElection<T>> buildAsync() {
.getPartitionGroup(protocol)
.getPartition(name())
.getPrimitiveClient()
.proxyBuilder(name(), primitiveType(), protocol)
.build()
.newProxy(name(), primitiveType(), protocol)
.open()
.thenApply(proxy -> new TranscodingAsyncLeaderElection<T, byte[]>(new LeaderElectionProxy(proxy), serializer()::encode, serializer()::decode).sync());
}
Expand Down
Expand Up @@ -89,15 +89,14 @@ public void removeListener(PrimaryElectionEventListener listener) {
@SuppressWarnings("unchecked")
private AsyncLeaderElector<NodeId> newLeaderElector(Partition partition) {
PrimitiveProxy proxy = partition.getPrimitiveClient()
.proxyBuilder(PRIMITIVE_NAME, LeaderElectorType.instance(), RaftProtocol.builder()
.newProxy(PRIMITIVE_NAME, LeaderElectorType.instance(), RaftProtocol.builder()
.withMinTimeout(Duration.ofMillis(250))
.withMaxTimeout(Duration.ofSeconds(5))
.withReadConsistency(ReadConsistency.LINEARIZABLE)
.withCommunicationStrategy(CommunicationStrategy.LEADER)
.withRecoveryStrategy(RecoveryStrategy.RECOVER)
.build())
.withMaxRetries(5)
.build();
.withMaxRetries(5)
.build());
AsyncLeaderElector<byte[]> leaderElector = new LeaderElectorProxy(proxy.open().join());
return new TranscodingAsyncLeaderElector<>(leaderElector, SERIALIZER::encode, SERIALIZER::decode);
}
Expand Down
Expand Up @@ -60,7 +60,7 @@ public CompletableFuture<LeaderElector<T>> buildAsync() {
Map<PartitionId, CompletableFuture<AsyncLeaderElector<T>>> electors = Maps.newConcurrentMap();
for (Partition partition : partitions.getPartitions()) {
electors.put(partition.id(),
newLeaderElector(partition.getPrimitiveClient().proxyBuilder(name(), primitiveType(), protocol).build()));
newLeaderElector(partition.getPrimitiveClient().newProxy(name(), primitiveType(), protocol)));
}

Partitioner<String> partitioner = topic -> partitions.getPartition(topic).id();
Expand Down
Expand Up @@ -86,6 +86,8 @@ private PrimitiveProtocol newRaftProtocol(Consistency readConsistency) {
.withMinTimeout(Duration.ofSeconds(5))
.withMaxTimeout(Duration.ofSeconds(30))
.withReadConsistency(readConsistency == Consistency.LINEARIZABLE ? ReadConsistency.LINEARIZABLE : ReadConsistency.SEQUENTIAL)
.withMaxRetries(maxRetries())
.withRetryDelay(retryDelay())
.build();
}

Expand All @@ -94,6 +96,8 @@ private PrimitiveProtocol newMultiPrimaryProtocol(Consistency consistency, Repli
.withConsistency(consistency)
.withReplication(replication)
.withBackups(backups())
.withMaxRetries(maxRetries())
.withRetryDelay(retryDelay())
.build();
}
}
Expand Up @@ -44,8 +44,7 @@ public CompletableFuture<AtomicIdGenerator> buildAsync() {
.getPartitionGroup(protocol)
.getPartition(name())
.getPrimitiveClient()
.proxyBuilder(name(), primitiveType(), protocol)
.build()
.newProxy(name(), primitiveType(), protocol)
.open()
.thenApply(proxy -> new DelegatingIdGenerator(new AtomicCounterProxy(proxy)).sync());
}
Expand Down
Expand Up @@ -58,15 +58,14 @@ public CompletableFuture<SessionId> nextSessionId() {
public CompletableFuture<SessionIdService> open() {
PrimitiveProxy proxy = partitions.getPartition(PRIMITIVE_NAME)
.getPrimitiveClient()
.proxyBuilder(PRIMITIVE_NAME, AtomicIdGeneratorType.instance(), RaftProtocol.builder()
.newProxy(PRIMITIVE_NAME, AtomicIdGeneratorType.instance(), RaftProtocol.builder()
.withMinTimeout(Duration.ofMillis(250))
.withMaxTimeout(Duration.ofSeconds(5))
.withReadConsistency(ReadConsistency.LINEARIZABLE)
.withCommunicationStrategy(CommunicationStrategy.LEADER)
.withRecoveryStrategy(RecoveryStrategy.RECOVER)
.build())
.withMaxRetries(5)
.build();
.withMaxRetries(5)
.build());
return proxy.open()
.thenApply(v -> {
idGenerator = new DelegatingIdGenerator(new AtomicCounterProxy(proxy));
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/java/io/atomix/lock/DistributedLockBuilder.java
Expand Up @@ -112,6 +112,8 @@ private PrimitiveProtocol newRaftProtocol(Consistency readConsistency) {
.withMinTimeout(lockTimeout)
.withMaxTimeout(Duration.ofSeconds(5))
.withReadConsistency(readConsistency == Consistency.LINEARIZABLE ? ReadConsistency.LINEARIZABLE : ReadConsistency.SEQUENTIAL)
.withMaxRetries(maxRetries())
.withRetryDelay(retryDelay())
.build();
}
}
Expand Up @@ -43,8 +43,7 @@ public CompletableFuture<DistributedLock> buildAsync() {
.getPartitionGroup(protocol)
.getPartition(name())
.getPrimitiveClient()
.proxyBuilder(name(), primitiveType(), protocol)
.build()
.newProxy(name(), primitiveType(), protocol)
.open()
.thenApply(proxy -> new DistributedLockProxy(proxy).sync());
}
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/io/atomix/map/AtomicCounterMapBuilder.java
Expand Up @@ -86,6 +86,8 @@ private PrimitiveProtocol newRaftProtocol(Consistency readConsistency) {
.withMinTimeout(Duration.ofSeconds(5))
.withMaxTimeout(Duration.ofSeconds(30))
.withReadConsistency(readConsistency == Consistency.LINEARIZABLE ? ReadConsistency.LINEARIZABLE : ReadConsistency.SEQUENTIAL)
.withMaxRetries(maxRetries())
.withRetryDelay(retryDelay())
.build();
}

Expand All @@ -94,6 +96,8 @@ private PrimitiveProtocol newMultiPrimaryProtocol(Consistency consistency, Repli
.withConsistency(consistency)
.withReplication(replication)
.withBackups(backups())
.withMaxRetries(maxRetries())
.withRetryDelay(retryDelay())
.build();
}
}
4 changes: 4 additions & 0 deletions core/src/main/java/io/atomix/map/ConsistentMapBuilder.java
Expand Up @@ -111,6 +111,8 @@ private PrimitiveProtocol newRaftProtocol(Consistency readConsistency) {
.withMinTimeout(Duration.ofSeconds(5))
.withMaxTimeout(Duration.ofSeconds(30))
.withReadConsistency(readConsistency == Consistency.LINEARIZABLE ? ReadConsistency.LINEARIZABLE : ReadConsistency.SEQUENTIAL)
.withMaxRetries(maxRetries())
.withRetryDelay(retryDelay())
.build();
}

Expand All @@ -119,6 +121,8 @@ private PrimitiveProtocol newMultiPrimaryProtocol(Consistency consistency, Repli
.withConsistency(consistency)
.withReplication(replication)
.withBackups(backups())
.withMaxRetries(maxRetries())
.withRetryDelay(retryDelay())
.build();
}
}
Expand Up @@ -88,6 +88,8 @@ private PrimitiveProtocol newRaftProtocol(Consistency readConsistency) {
.withMinTimeout(Duration.ofSeconds(5))
.withMaxTimeout(Duration.ofSeconds(30))
.withReadConsistency(readConsistency == Consistency.LINEARIZABLE ? ReadConsistency.LINEARIZABLE : ReadConsistency.SEQUENTIAL)
.withMaxRetries(maxRetries())
.withRetryDelay(retryDelay())
.build();
}

Expand All @@ -96,6 +98,8 @@ private PrimitiveProtocol newMultiPrimaryProtocol(Consistency consistency, Repli
.withConsistency(consistency)
.withReplication(replication)
.withBackups(backups())
.withMaxRetries(maxRetries())
.withRetryDelay(retryDelay())
.build();
}
}
Expand Up @@ -45,8 +45,7 @@ public CompletableFuture<AtomicCounterMap<K>> buildAsync() {
.getPartitionGroup(protocol)
.getPartition(name())
.getPrimitiveClient()
.proxyBuilder(name(), primitiveType(), protocol)
.build()
.newProxy(name(), primitiveType(), protocol)
.open()
.thenApply(proxy -> {
AtomicCounterMapProxy rawMap = new AtomicCounterMapProxy(proxy);
Expand Down
Expand Up @@ -61,8 +61,7 @@ public CompletableFuture<ConsistentMap<K, V>> buildAsync() {
Map<PartitionId, CompletableFuture<AsyncConsistentMap<byte[], byte[]>>> maps = Maps.newConcurrentMap();
for (Partition partition : partitions.getPartitions()) {
maps.put(partition.id(), partition.getPrimitiveClient()
.proxyBuilder(name(), primitiveType(), protocol)
.build()
.newProxy(name(), primitiveType(), protocol)
.open()
.thenApply(proxy -> new TranscodingAsyncConsistentMap<>(
new ConsistentMapProxy(proxy),
Expand Down
Expand Up @@ -47,8 +47,7 @@ public CompletableFuture<ConsistentTreeMap<V>> buildAsync() {
.getPartitionGroup(protocol)
.getPartition(name())
.getPrimitiveClient()
.proxyBuilder(name(), primitiveType(), protocol)
.build()
.newProxy(name(), primitiveType(), protocol)
.open()
.thenApply(proxy -> {
ConsistentTreeMapProxy rawMap = new ConsistentTreeMapProxy(proxy);
Expand Down
Expand Up @@ -88,6 +88,8 @@ private PrimitiveProtocol newRaftProtocol(Consistency readConsistency) {
.withMinTimeout(Duration.ofSeconds(5))
.withMaxTimeout(Duration.ofSeconds(30))
.withReadConsistency(readConsistency == Consistency.LINEARIZABLE ? ReadConsistency.LINEARIZABLE : ReadConsistency.SEQUENTIAL)
.withMaxRetries(maxRetries())
.withRetryDelay(retryDelay())
.build();
}

Expand All @@ -96,6 +98,8 @@ private PrimitiveProtocol newMultiPrimaryProtocol(Consistency consistency, Repli
.withConsistency(consistency)
.withReplication(replication)
.withBackups(backups())
.withMaxRetries(maxRetries())
.withRetryDelay(retryDelay())
.build();
}
}
Expand Up @@ -47,8 +47,7 @@ public CompletableFuture<ConsistentMultimap<K, V>> buildAsync() {
.getPartitionGroup(protocol)
.getPartition(name())
.getPrimitiveClient()
.proxyBuilder(name(), primitiveType(), protocol)
.build()
.newProxy(name(), primitiveType(), protocol)
.open()
.thenApply(proxy -> {
AsyncConsistentMultimap<String, byte[]> rawMap = new ConsistentSetMultimapProxy(proxy);
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/io/atomix/queue/WorkQueueBuilder.java
Expand Up @@ -86,6 +86,8 @@ private PrimitiveProtocol newRaftProtocol(Consistency readConsistency) {
.withMinTimeout(Duration.ofSeconds(5))
.withMaxTimeout(Duration.ofSeconds(30))
.withReadConsistency(readConsistency == Consistency.LINEARIZABLE ? ReadConsistency.LINEARIZABLE : ReadConsistency.SEQUENTIAL)
.withMaxRetries(maxRetries())
.withRetryDelay(retryDelay())
.build();
}

Expand All @@ -94,6 +96,8 @@ private PrimitiveProtocol newMultiPrimaryProtocol(Consistency consistency, Repli
.withConsistency(consistency)
.withReplication(replication)
.withBackups(backups())
.withMaxRetries(maxRetries())
.withRetryDelay(retryDelay())
.build();
}
}
Expand Up @@ -43,8 +43,7 @@ public CompletableFuture<WorkQueue<E>> buildAsync() {
.getPartitionGroup(protocol)
.getPartition(name())
.getPrimitiveClient()
.proxyBuilder(name(), primitiveType(), protocol)
.build()
.newProxy(name(), primitiveType(), protocol)
.open()
.thenApply(proxy -> new TranscodingAsyncWorkQueue<E, byte[]>(new WorkQueueProxy(proxy), serializer()::encode, serializer()::decode).sync());
}
Expand Down
Expand Up @@ -24,6 +24,7 @@
import io.atomix.set.DistributedSetBuilder;
import io.atomix.utils.serializer.Serializer;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;

/**
Expand Down Expand Up @@ -87,6 +88,18 @@ public DistributedSetBuilder<E> withBackups(int numBackups) {
return this;
}

@Override
public DistributedSetBuilder<E> withMaxRetries(int maxRetries) {
mapBuilder.withMaxRetries(maxRetries);
return this;
}

@Override
public DistributedSetBuilder<E> withRetryDelay(Duration retryDelay) {
mapBuilder.withRetryDelay(retryDelay);
return this;
}

@Override
public boolean readOnly() {
return mapBuilder.readOnly();
Expand Down Expand Up @@ -132,6 +145,16 @@ public int backups() {
return mapBuilder.backups();
}

@Override
public int maxRetries() {
return mapBuilder.maxRetries();
}

@Override
public Duration retryDelay() {
return mapBuilder.retryDelay();
}

@Override
public CompletableFuture<DistributedSet<E>> buildAsync() {
return mapBuilder.buildAsync()
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/io/atomix/tree/DocumentTreeBuilder.java
Expand Up @@ -127,6 +127,8 @@ private PrimitiveProtocol newRaftProtocol(Consistency readConsistency) {
.withMinTimeout(Duration.ofSeconds(5))
.withMaxTimeout(Duration.ofSeconds(30))
.withReadConsistency(readConsistency == Consistency.LINEARIZABLE ? ReadConsistency.LINEARIZABLE : ReadConsistency.SEQUENTIAL)
.withMaxRetries(maxRetries())
.withRetryDelay(retryDelay())
.build();
}

Expand All @@ -135,6 +137,8 @@ private PrimitiveProtocol newMultiPrimaryProtocol(Consistency consistency, Repli
.withConsistency(consistency)
.withReplication(replication)
.withBackups(backups())
.withMaxRetries(maxRetries())
.withRetryDelay(retryDelay())
.build();
}
}
Expand Up @@ -59,8 +59,7 @@ public CompletableFuture<DocumentTree<V>> buildAsync() {
Map<PartitionId, CompletableFuture<AsyncDocumentTree<V>>> trees = Maps.newConcurrentMap();
for (Partition partition : partitions.getPartitions()) {
trees.put(partition.id(), partition.getPrimitiveClient()
.proxyBuilder(name(), primitiveType(), protocol)
.build()
.newProxy(name(), primitiveType(), protocol)
.open()
.thenApply(proxy -> {
DocumentTreeProxy rawTree = new DocumentTreeProxy(proxy);
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/io/atomix/value/AtomicValueBuilder.java
Expand Up @@ -89,6 +89,8 @@ private PrimitiveProtocol newRaftProtocol(Consistency readConsistency) {
.withMinTimeout(Duration.ofSeconds(5))
.withMaxTimeout(Duration.ofSeconds(30))
.withReadConsistency(readConsistency == Consistency.LINEARIZABLE ? ReadConsistency.LINEARIZABLE : ReadConsistency.SEQUENTIAL)
.withMaxRetries(maxRetries())
.withRetryDelay(retryDelay())
.build();
}

Expand All @@ -97,6 +99,8 @@ private PrimitiveProtocol newMultiPrimaryProtocol(Consistency consistency, Repli
.withConsistency(consistency)
.withReplication(replication)
.withBackups(backups())
.withMaxRetries(maxRetries())
.withRetryDelay(retryDelay())
.build();
}
}
Expand Up @@ -45,8 +45,7 @@ public CompletableFuture<AtomicValue<V>> buildAsync() {
.getPartitionGroup(protocol)
.getPartition(name())
.getPrimitiveClient()
.proxyBuilder(name(), primitiveType(), protocol)
.build()
.newProxy(name(), primitiveType(), protocol)
.open()
.thenApply(proxy -> {
AtomicValueProxy value = new AtomicValueProxy(proxy);
Expand Down

0 comments on commit 2d4ad14

Please sign in to comment.