Skip to content

Commit

Permalink
Refactor primitive protocol API to support partitioned protocols.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Apr 27, 2018
1 parent 642cd94 commit b688543
Show file tree
Hide file tree
Showing 108 changed files with 3,537 additions and 2,271 deletions.
8 changes: 4 additions & 4 deletions core/src/main/java/io/atomix/core/Atomix.java
Expand Up @@ -451,7 +451,7 @@ public Builder addProfile(Profile profile) {
* @param systemManagementGroup the system management partition group * @param systemManagementGroup the system management partition group
* @return the Atomix builder * @return the Atomix builder
*/ */
public Builder withManagementGroup(ManagedPartitionGroup systemManagementGroup) { public Builder withManagementGroup(ManagedPartitionGroup<?> systemManagementGroup) {
config.setManagementGroup(systemManagementGroup.config()); config.setManagementGroup(systemManagementGroup.config());
return this; return this;
} }
Expand All @@ -463,7 +463,7 @@ public Builder withManagementGroup(ManagedPartitionGroup systemManagementGroup)
* @return the Atomix builder * @return the Atomix builder
* @throws NullPointerException if the partition groups are null * @throws NullPointerException if the partition groups are null
*/ */
public Builder withPartitionGroups(ManagedPartitionGroup... partitionGroups) { public Builder withPartitionGroups(ManagedPartitionGroup<?>... partitionGroups) {
return withPartitionGroups(Arrays.asList(checkNotNull(partitionGroups, "partitionGroups cannot be null"))); return withPartitionGroups(Arrays.asList(checkNotNull(partitionGroups, "partitionGroups cannot be null")));
} }


Expand All @@ -474,7 +474,7 @@ public Builder withPartitionGroups(ManagedPartitionGroup... partitionGroups) {
* @return the Atomix builder * @return the Atomix builder
* @throws NullPointerException if the partition groups are null * @throws NullPointerException if the partition groups are null
*/ */
public Builder withPartitionGroups(Collection<ManagedPartitionGroup> partitionGroups) { public Builder withPartitionGroups(Collection<ManagedPartitionGroup<?>> partitionGroups) {
partitionGroups.forEach(group -> config.addPartitionGroup(group.config())); partitionGroups.forEach(group -> config.addPartitionGroup(group.config()));
return this; return this;
} }
Expand All @@ -486,7 +486,7 @@ public Builder withPartitionGroups(Collection<ManagedPartitionGroup> partitionGr
* @return the Atomix builder * @return the Atomix builder
* @throws NullPointerException if the partition group is null * @throws NullPointerException if the partition group is null
*/ */
public Builder addPartitionGroup(ManagedPartitionGroup partitionGroup) { public Builder addPartitionGroup(ManagedPartitionGroup<?> partitionGroup) {
config.addPartitionGroup(partitionGroup.config()); config.addPartitionGroup(partitionGroup.config());
return this; return this;
} }
Expand Down
Expand Up @@ -21,12 +21,16 @@
import io.atomix.core.counter.impl.AtomicCounterOperations.CompareAndSet; import io.atomix.core.counter.impl.AtomicCounterOperations.CompareAndSet;
import io.atomix.core.counter.impl.AtomicCounterOperations.GetAndAdd; import io.atomix.core.counter.impl.AtomicCounterOperations.GetAndAdd;
import io.atomix.core.counter.impl.AtomicCounterOperations.Set; import io.atomix.core.counter.impl.AtomicCounterOperations.Set;
import io.atomix.primitive.PrimitiveRegistry;
import io.atomix.primitive.impl.AbstractAsyncPrimitive; import io.atomix.primitive.impl.AbstractAsyncPrimitive;
import io.atomix.primitive.proxy.PrimitiveProxy; import io.atomix.primitive.proxy.PrimitiveProxy;
import io.atomix.utils.serializer.KryoNamespace; import io.atomix.utils.serializer.KryoNamespace;
import io.atomix.utils.serializer.KryoNamespaces; import io.atomix.utils.serializer.KryoNamespaces;
import io.atomix.utils.serializer.Serializer; import io.atomix.utils.serializer.Serializer;


import java.time.Duration;
import java.util.concurrent.CompletableFuture;

import static io.atomix.core.counter.impl.AtomicCounterOperations.ADD_AND_GET; import static io.atomix.core.counter.impl.AtomicCounterOperations.ADD_AND_GET;
import static io.atomix.core.counter.impl.AtomicCounterOperations.COMPARE_AND_SET; import static io.atomix.core.counter.impl.AtomicCounterOperations.COMPARE_AND_SET;
import static io.atomix.core.counter.impl.AtomicCounterOperations.DECREMENT_AND_GET; import static io.atomix.core.counter.impl.AtomicCounterOperations.DECREMENT_AND_GET;
Expand All @@ -37,20 +41,17 @@
import static io.atomix.core.counter.impl.AtomicCounterOperations.INCREMENT_AND_GET; import static io.atomix.core.counter.impl.AtomicCounterOperations.INCREMENT_AND_GET;
import static io.atomix.core.counter.impl.AtomicCounterOperations.SET; import static io.atomix.core.counter.impl.AtomicCounterOperations.SET;


import java.time.Duration;
import java.util.concurrent.CompletableFuture;

/** /**
* Atomix counter implementation. * Atomix counter implementation.
*/ */
public class AtomicCounterProxy extends AbstractAsyncPrimitive implements AsyncAtomicCounter { public class AtomicCounterProxy extends AbstractAsyncPrimitive<AsyncAtomicCounter> implements AsyncAtomicCounter {
private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.builder() private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.builder()
.register(KryoNamespaces.BASIC) .register(KryoNamespaces.BASIC)
.register(AtomicCounterOperations.NAMESPACE) .register(AtomicCounterOperations.NAMESPACE)
.build()); .build());


public AtomicCounterProxy(PrimitiveProxy proxy) { public AtomicCounterProxy(PrimitiveProxy proxy, PrimitiveRegistry registry) {
super(proxy); super(proxy, registry);
} }


private long nullOrZero(Long value) { private long nullOrZero(Long value) {
Expand All @@ -59,48 +60,48 @@ private long nullOrZero(Long value) {


@Override @Override
public CompletableFuture<Long> get() { public CompletableFuture<Long> get() {
return proxy.<Long>invoke(GET, SERIALIZER::decode).thenApply(this::nullOrZero); return this.<Long>invoke(getPartitionKey(), GET, SERIALIZER::decode).thenApply(this::nullOrZero);
} }


@Override @Override
public CompletableFuture<Void> set(long value) { public CompletableFuture<Void> set(long value) {
return proxy.invoke(SET, SERIALIZER::encode, new Set(value)); return this.invoke(getPartitionKey(), SET, SERIALIZER::encode, new Set(value));
} }


@Override @Override
public CompletableFuture<Boolean> compareAndSet(long expectedValue, long updateValue) { public CompletableFuture<Boolean> compareAndSet(long expectedValue, long updateValue) {
return proxy.invoke(COMPARE_AND_SET, SERIALIZER::encode, return this.invoke(getPartitionKey(), COMPARE_AND_SET, SERIALIZER::encode,
new CompareAndSet(expectedValue, updateValue), SERIALIZER::decode); new CompareAndSet(expectedValue, updateValue), SERIALIZER::decode);
} }


@Override @Override
public CompletableFuture<Long> addAndGet(long delta) { public CompletableFuture<Long> addAndGet(long delta) {
return proxy.invoke(ADD_AND_GET, SERIALIZER::encode, new AddAndGet(delta), SERIALIZER::decode); return this.invoke(getPartitionKey(), ADD_AND_GET, SERIALIZER::encode, new AddAndGet(delta), SERIALIZER::decode);
} }


@Override @Override
public CompletableFuture<Long> getAndAdd(long delta) { public CompletableFuture<Long> getAndAdd(long delta) {
return proxy.invoke(GET_AND_ADD, SERIALIZER::encode, new GetAndAdd(delta), SERIALIZER::decode); return this.invoke(getPartitionKey(), GET_AND_ADD, SERIALIZER::encode, new GetAndAdd(delta), SERIALIZER::decode);
} }


@Override @Override
public CompletableFuture<Long> incrementAndGet() { public CompletableFuture<Long> incrementAndGet() {
return proxy.invoke(INCREMENT_AND_GET, SERIALIZER::decode); return this.invoke(getPartitionKey(), INCREMENT_AND_GET, SERIALIZER::decode);
} }


@Override @Override
public CompletableFuture<Long> getAndIncrement() { public CompletableFuture<Long> getAndIncrement() {
return proxy.invoke(GET_AND_INCREMENT, SERIALIZER::decode); return this.invoke(getPartitionKey(), GET_AND_INCREMENT, SERIALIZER::decode);
} }


@Override @Override
public CompletableFuture<Long> decrementAndGet() { public CompletableFuture<Long> decrementAndGet() {
return proxy.invoke(DECREMENT_AND_GET, SERIALIZER::decode); return this.invoke(getPartitionKey(), DECREMENT_AND_GET, SERIALIZER::decode);
} }


@Override @Override
public CompletableFuture<Long> getAndDecrement() { public CompletableFuture<Long> getAndDecrement() {
return proxy.invoke(GET_AND_DECREMENT, SERIALIZER::decode); return this.invoke(getPartitionKey(), GET_AND_DECREMENT, SERIALIZER::decode);
} }


@Override @Override
Expand Down
Expand Up @@ -19,7 +19,7 @@
import io.atomix.core.counter.AtomicCounterBuilder; import io.atomix.core.counter.AtomicCounterBuilder;
import io.atomix.core.counter.AtomicCounterConfig; import io.atomix.core.counter.AtomicCounterConfig;
import io.atomix.primitive.PrimitiveManagementService; import io.atomix.primitive.PrimitiveManagementService;
import io.atomix.primitive.protocol.PrimitiveProtocol; import io.atomix.primitive.proxy.PrimitiveProxy;


import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;


Expand All @@ -34,15 +34,12 @@ public AtomicCounterProxyBuilder(String name, AtomicCounterConfig config, Primit
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public CompletableFuture<AtomicCounter> buildAsync() { public CompletableFuture<AtomicCounter> buildAsync() {
PrimitiveProtocol protocol = protocol(); PrimitiveProxy proxy = protocol.newProxy(
return managementService.getPrimitiveRegistry() name(),
.createPrimitive(name(), primitiveType()) primitiveType(),
.thenCompose(info -> managementService.getPartitionService() managementService.getPartitionService().getPartitionGroup(protocol.group()));
.getPartitionGroup(protocol) return new AtomicCounterProxy(proxy, managementService.getPrimitiveRegistry())
.getPartition(name()) .connect()
.getPrimitiveClient() .thenApply(counter -> counter.sync());
.newProxy(name(), primitiveType(), protocol)
.connect()
.thenApply(proxy -> new AtomicCounterProxy(proxy).sync()));
} }
} }
Expand Up @@ -16,7 +16,6 @@
package io.atomix.core.election.impl; package io.atomix.core.election.impl;


import com.google.common.collect.Sets; import com.google.common.collect.Sets;

import io.atomix.core.election.AsyncLeaderElection; import io.atomix.core.election.AsyncLeaderElection;
import io.atomix.core.election.LeaderElection; import io.atomix.core.election.LeaderElection;
import io.atomix.core.election.Leadership; import io.atomix.core.election.Leadership;
Expand All @@ -27,11 +26,19 @@
import io.atomix.core.election.impl.LeaderElectionOperations.Promote; import io.atomix.core.election.impl.LeaderElectionOperations.Promote;
import io.atomix.core.election.impl.LeaderElectionOperations.Run; import io.atomix.core.election.impl.LeaderElectionOperations.Run;
import io.atomix.core.election.impl.LeaderElectionOperations.Withdraw; import io.atomix.core.election.impl.LeaderElectionOperations.Withdraw;
import io.atomix.primitive.PrimitiveRegistry;
import io.atomix.primitive.impl.AbstractAsyncPrimitive; import io.atomix.primitive.impl.AbstractAsyncPrimitive;
import io.atomix.primitive.partition.PartitionId;
import io.atomix.primitive.proxy.PrimitiveProxy; import io.atomix.primitive.proxy.PrimitiveProxy;
import io.atomix.primitive.proxy.Proxy;
import io.atomix.utils.serializer.KryoNamespace; import io.atomix.utils.serializer.KryoNamespace;
import io.atomix.utils.serializer.Serializer; import io.atomix.utils.serializer.Serializer;


import java.time.Duration;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import static io.atomix.core.election.impl.LeaderElectionEvents.CHANGE; import static io.atomix.core.election.impl.LeaderElectionEvents.CHANGE;
import static io.atomix.core.election.impl.LeaderElectionOperations.ADD_LISTENER; import static io.atomix.core.election.impl.LeaderElectionOperations.ADD_LISTENER;
import static io.atomix.core.election.impl.LeaderElectionOperations.ANOINT; import static io.atomix.core.election.impl.LeaderElectionOperations.ANOINT;
Expand All @@ -42,70 +49,59 @@
import static io.atomix.core.election.impl.LeaderElectionOperations.RUN; import static io.atomix.core.election.impl.LeaderElectionOperations.RUN;
import static io.atomix.core.election.impl.LeaderElectionOperations.WITHDRAW; import static io.atomix.core.election.impl.LeaderElectionOperations.WITHDRAW;


import java.time.Duration;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

/** /**
* Distributed resource providing the {@link AsyncLeaderElection} primitive. * Distributed resource providing the {@link AsyncLeaderElection} primitive.
*/ */
public class LeaderElectionProxy extends AbstractAsyncPrimitive implements AsyncLeaderElection<byte[]> { public class LeaderElectionProxy extends AbstractAsyncPrimitive<AsyncLeaderElection<byte[]>> implements AsyncLeaderElection<byte[]> {
private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.builder() private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.builder()
.register(LeaderElectionOperations.NAMESPACE) .register(LeaderElectionOperations.NAMESPACE)
.register(LeaderElectionEvents.NAMESPACE) .register(LeaderElectionEvents.NAMESPACE)
.build()); .build());


private final Set<LeadershipEventListener> leadershipChangeListeners = Sets.newCopyOnWriteArraySet(); private final Set<LeadershipEventListener> leadershipChangeListeners = Sets.newCopyOnWriteArraySet();


public LeaderElectionProxy(PrimitiveProxy proxy) { public LeaderElectionProxy(PrimitiveProxy proxy, PrimitiveRegistry registry) {
super(proxy); super(proxy, registry);
proxy.addStateChangeListener(state -> {
if (state == PrimitiveProxy.State.CONNECTED && isListening()) {
proxy.invoke(ADD_LISTENER);
}
});
proxy.addEventListener(CHANGE, SERIALIZER::decode, this::handleEvent);
} }


private void handleEvent(List<LeadershipEvent> changes) { private void handleEvent(PartitionId partitionId, List<LeadershipEvent> changes) {
changes.forEach(change -> leadershipChangeListeners.forEach(l -> l.onEvent(change))); changes.forEach(change -> leadershipChangeListeners.forEach(l -> l.onEvent(change)));
} }


@Override @Override
public CompletableFuture<Leadership<byte[]>> run(byte[] id) { public CompletableFuture<Leadership<byte[]>> run(byte[] id) {
return proxy.invoke(RUN, SERIALIZER::encode, new Run(id), SERIALIZER::decode); return invoke(getPartitionKey(), RUN, SERIALIZER::encode, new Run(id), SERIALIZER::decode);
} }


@Override @Override
public CompletableFuture<Void> withdraw(byte[] id) { public CompletableFuture<Void> withdraw(byte[] id) {
return proxy.invoke(WITHDRAW, SERIALIZER::encode, new Withdraw(id)); return invoke(getPartitionKey(), WITHDRAW, SERIALIZER::encode, new Withdraw(id));
} }


@Override @Override
public CompletableFuture<Boolean> anoint(byte[] id) { public CompletableFuture<Boolean> anoint(byte[] id) {
return proxy.<Anoint, Boolean>invoke(ANOINT, SERIALIZER::encode, new Anoint(id), SERIALIZER::decode); return this.invoke(getPartitionKey(), ANOINT, SERIALIZER::encode, new Anoint(id), SERIALIZER::decode);
} }


@Override @Override
public CompletableFuture<Boolean> promote(byte[] id) { public CompletableFuture<Boolean> promote(byte[] id) {
return proxy.<Promote, Boolean>invoke(PROMOTE, SERIALIZER::encode, new Promote(id), SERIALIZER::decode); return this.invoke(getPartitionKey(), PROMOTE, SERIALIZER::encode, new Promote(id), SERIALIZER::decode);
} }


@Override @Override
public CompletableFuture<Void> evict(byte[] id) { public CompletableFuture<Void> evict(byte[] id) {
return proxy.invoke(EVICT, SERIALIZER::encode, new Evict(id)); return invoke(getPartitionKey(), EVICT, SERIALIZER::encode, new Evict(id));
} }


@Override @Override
public CompletableFuture<Leadership<byte[]>> getLeadership() { public CompletableFuture<Leadership<byte[]>> getLeadership() {
return proxy.invoke(GET_LEADERSHIP, SERIALIZER::decode); return invoke(getPartitionKey(), GET_LEADERSHIP, SERIALIZER::decode);
} }


@Override @Override
public synchronized CompletableFuture<Void> addListener(LeadershipEventListener listener) { public synchronized CompletableFuture<Void> addListener(LeadershipEventListener listener) {
if (leadershipChangeListeners.isEmpty()) { if (leadershipChangeListeners.isEmpty()) {
return proxy.invoke(ADD_LISTENER).thenRun(() -> leadershipChangeListeners.add(listener)); return invoke(getPartitionKey(), ADD_LISTENER).thenRun(() -> leadershipChangeListeners.add(listener));
} else { } else {
leadershipChangeListeners.add(listener); leadershipChangeListeners.add(listener);
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
Expand All @@ -115,7 +111,7 @@ public synchronized CompletableFuture<Void> addListener(LeadershipEventListener
@Override @Override
public synchronized CompletableFuture<Void> removeListener(LeadershipEventListener listener) { public synchronized CompletableFuture<Void> removeListener(LeadershipEventListener listener) {
if (leadershipChangeListeners.remove(listener) && leadershipChangeListeners.isEmpty()) { if (leadershipChangeListeners.remove(listener) && leadershipChangeListeners.isEmpty()) {
return proxy.invoke(REMOVE_LISTENER).thenApply(v -> null); return invoke(getPartitionKey(), REMOVE_LISTENER).thenApply(v -> null);
} }
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
} }
Expand All @@ -124,6 +120,20 @@ private boolean isListening() {
return !leadershipChangeListeners.isEmpty(); return !leadershipChangeListeners.isEmpty();
} }


@Override
public CompletableFuture<AsyncLeaderElection<byte[]>> connect() {
return super.connect()
.thenRun(() -> {
addStateChangeListeners((partition, state) -> {
if (state == Proxy.State.CONNECTED && isListening()) {
invoke(partition, ADD_LISTENER);
}
});
addEventListeners(CHANGE, SERIALIZER::decode, this::handleEvent);
})
.thenApply(v -> this);
}

@Override @Override
public LeaderElection<byte[]> sync(Duration operationTimeout) { public LeaderElection<byte[]> sync(Duration operationTimeout) {
return new BlockingLeaderElection<>(this, operationTimeout.toMillis()); return new BlockingLeaderElection<>(this, operationTimeout.toMillis());
Expand Down
Expand Up @@ -19,7 +19,8 @@
import io.atomix.core.election.LeaderElectionBuilder; import io.atomix.core.election.LeaderElectionBuilder;
import io.atomix.core.election.LeaderElectionConfig; import io.atomix.core.election.LeaderElectionConfig;
import io.atomix.primitive.PrimitiveManagementService; import io.atomix.primitive.PrimitiveManagementService;
import io.atomix.primitive.protocol.PrimitiveProtocol; import io.atomix.primitive.proxy.PrimitiveProxy;
import io.atomix.utils.serializer.Serializer;


import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;


Expand All @@ -34,14 +35,19 @@ public LeaderElectionProxyBuilder(String name, LeaderElectionConfig config, Prim
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public CompletableFuture<LeaderElection<T>> buildAsync() { public CompletableFuture<LeaderElection<T>> buildAsync() {
PrimitiveProtocol protocol = protocol(); PrimitiveProxy proxy = protocol.newProxy(
return managementService.getPrimitiveRegistry().createPrimitive(name(), primitiveType()) name(),
.thenCompose(info -> managementService.getPartitionService() primitiveType(),
.getPartitionGroup(protocol) managementService.getPartitionService().getPartitionGroup(protocol.group()));
.getPartition(name()) return new LeaderElectionProxy(proxy, managementService.getPrimitiveRegistry())
.getPrimitiveClient() .connect()
.newProxy(name(), primitiveType(), protocol) .thenApply(elector -> {
.connect() Serializer serializer = serializer();
.thenApply(proxy -> new TranscodingAsyncLeaderElection<T, byte[]>(new LeaderElectionProxy(proxy), serializer()::encode, serializer()::decode).sync())); return new TranscodingAsyncLeaderElection<T, byte[]>(
elector,
key -> serializer.encode(key),
bytes -> serializer.decode(bytes))
.sync();
});
} }
} }

0 comments on commit b688543

Please sign in to comment.