Skip to content

Commit

Permalink
Merge pull request #109 from atomix/demote-on-leave
Browse files Browse the repository at this point in the history
Replace leaving members to preserve the quorum size
  • Loading branch information
kuujo committed Feb 10, 2016
2 parents 9866560 + f57710e commit c054a8b
Show file tree
Hide file tree
Showing 14 changed files with 491 additions and 221 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,17 @@ private void resignLeader(boolean toCandidate) {
*/
private void electLeader() {
Commit<GroupCommands.Join> commit = candidates.poll();
if (commit != null) {
leader = commit;
for (Session session : sessions) {
if (session.state() == Session.State.OPEN) {
session.publish("elect", leader.operation().member());
while (commit != null) {
if (commit.session().state() == Session.State.EXPIRED || commit.session().state() == Session.State.CLOSED) {
commit = candidates.poll();
} else {
leader = commit;
for (Session session : sessions) {
if (session.state() == Session.State.OPEN) {
session.publish("elect", leader.operation().member());
}
}
break;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,17 @@ public void close(Session session) {
if (lock != null && lock.session().id() == session.id()) {
lock.close();
lock = queue.poll();
if (lock != null) {
while (lock != null) {
Scheduled timer = timers.remove(lock.index());
if (timer != null)
timer.cancel();
lock.session().publish("lock", lock.index());

if (lock.session().state() == Session.State.EXPIRED || lock.session().state() == Session.State.CLOSED) {
lock = queue.poll();
} else {
lock.session().publish("lock", lock.index());
break;
}
}
}
}
Expand Down Expand Up @@ -103,11 +109,17 @@ public void unlock(Commit<LockCommands.Unlock> commit) {
lock.close();

lock = queue.poll();
if (lock != null) {
while (lock != null) {
Scheduled timer = timers.remove(lock.index());
if (timer != null)
timer.cancel();
lock.session().publish("lock", lock.index());

if (lock.session().state() == Session.State.EXPIRED || lock.session().state() == Session.State.CLOSED) {
lock = queue.poll();
} else {
lock.session().publish("lock", lock.index());
break;
}
}
}
} finally {
Expand Down
18 changes: 15 additions & 3 deletions core/src/main/java/io/atomix/Atomix.java
Original file line number Diff line number Diff line change
Expand Up @@ -203,36 +203,48 @@ public CompletableFuture<Boolean> exists(String key) {

@Override
public CompletableFuture<Set<String>> keys() {
return client.keys();
return client.keys().thenApply(this::cleanKeys);
}

@Override
public <T extends Resource> CompletableFuture<Set<String>> keys(Class<? super T> type) {
return client.keys(type);
return client.keys(type).thenApply(this::cleanKeys);
}

@Override
public CompletableFuture<Set<String>> keys(ResourceType type) {
return client.keys(type);
return client.keys(type).thenApply(this::cleanKeys);
}

/**
* Cleans the key set.
*/
private Set<String> cleanKeys(Set<String> keys) {
keys.remove("");
return keys;
}

@Override
public <T extends Resource> CompletableFuture<T> get(String key, Class<? super T> type) {
Assert.argNot(key.trim().length() == 0, "invalid resource key: key must be of non-zero length");
return client.get(key, type);
}

@Override
public <T extends Resource<T, U>, U extends Resource.Options> CompletableFuture<T> get(String key, Class<? super T> type, U options) {
Assert.argNot(key.trim().length() == 0, "invalid resource key: key must be of non-zero length");
return client.get(key, type, options);
}

@Override
public <T extends Resource> CompletableFuture<T> get(String key, ResourceType type) {
Assert.argNot(key.trim().length() == 0, "invalid resource key: key must be of non-zero length");
return client.get(key, type);
}

@Override
public <T extends Resource<T, U>, U extends Resource.Options> CompletableFuture<T> get(String key, ResourceType type, U options) {
Assert.argNot(key.trim().length() == 0, "invalid resource key: key must be of non-zero length");
return client.get(key, type, options);
}

Expand Down
140 changes: 119 additions & 21 deletions core/src/main/java/io/atomix/AtomixReplica.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.atomix.catalyst.util.Listener;
import io.atomix.catalyst.util.PropertiesReader;
import io.atomix.catalyst.util.concurrent.ThreadContext;
import io.atomix.coordination.DistributedLock;
import io.atomix.copycat.client.Command;
import io.atomix.copycat.client.CopycatClient;
import io.atomix.copycat.client.Query;
Expand Down Expand Up @@ -186,6 +187,8 @@ public static Builder builder(Address clientAddress, Address serverAddress, Coll

private final ResourceServer server;
private final ClusterBalancer balancer;
private DistributedLock lock;
private boolean locking;

public AtomixReplica(Properties properties) {
this(builder(properties));
Expand Down Expand Up @@ -225,19 +228,46 @@ private void registerListeners() {
* Balances the cluster.
*/
private void balance() {
if (server.server().cluster().member().equals(server.server().cluster().leader())) {
balancer.balance(server.server().cluster());
if (lock != null && !locking && server.server().cluster().member().equals(server.server().cluster().leader())) {
locking = true;
lock.lock()
.thenCompose(v -> balancer.balance(server.server().cluster()))
.whenComplete((r1, e1) -> lock.unlock().whenComplete((r2, e2) -> locking = false));
}
}

@Override
public CompletableFuture<Atomix> open() {
return server.open().thenRun(this::registerListeners).thenCompose(v -> super.open()).thenApply(v -> this);
return server.open()
.thenRun(this::registerListeners)
.thenCompose(v -> super.open())
.thenCompose(v -> client.get("", DistributedLock.class))
.thenApply(lock -> {
this.lock = lock;
return this;
});
}

@Override
public CompletableFuture<Void> close() {
return super.close().thenCompose(v -> server.close());
CompletableFuture<Void> future = new CompletableFuture<>();
lock.lock()
.thenCompose(v -> balancer.replace(server.server().cluster()))
.whenComplete((r1, e1) -> {
balancer.close();
lock.unlock().whenComplete((r2, e2) -> {
super.close().whenComplete((r3, e3) -> {
server.close().whenComplete((r4, e4) -> {
if (e4 == null) {
future.complete(null);
} else {
future.completeExceptionally(e4);
}
});
});
});
});
return future;
}

/**
Expand Down Expand Up @@ -279,7 +309,7 @@ public static class Builder extends io.atomix.catalyst.util.Builder<AtomixReplic
private Builder(Address clientAddress, Address serverAddress, Collection<Address> members) {
Serializer serializer = new Serializer(new ServiceLoaderTypeResolver());
this.clientAddress = Assert.notNull(clientAddress, "clientAddress");
this.clientBuilder = CopycatClient.builder(Collections.singleton(clientAddress)).withSerializer(serializer.clone());
this.clientBuilder = CopycatClient.builder(members).withSerializer(serializer.clone());
this.serverBuilder = CopycatServer.builder(clientAddress, serverAddress, members).withSerializer(serializer.clone());
this.quorumHint = members.size();
}
Expand Down Expand Up @@ -463,17 +493,33 @@ public Builder withSessionTimeout(Duration sessionTimeout) {
return this;
}

/**
* Builds the replica transports.
*/
private void buildTransport() {
// If no transport was configured by the user, attempt to load the Netty transport.
if (serverTransport == null) {
try {
serverTransport = (Transport) Class.forName("io.atomix.catalyst.transport.NettyTransport").newInstance();
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
throw new ConfigurationException("transport not configured");
}
}
}

/**
* Builds a client for the replica.
*/
private ResourceClient buildClient() {
buildTransport();

// Resolve resources.
resourceResolver.resolve(registry);

// Configure the client and server with a transport that routes all local client communication
// directly through the local server, ensuring we don't incur unnecessary network traffic by
// sending operations to a remote server when a local server is already available in the same JVM.=
clientBuilder.withTransport(new LocalTransport(localRegistry))
clientBuilder.withTransport(new CombinedClientTransport(clientAddress, new LocalTransport(localRegistry), clientTransport != null ? clientTransport : serverTransport))
.withServerSelectionStrategy(new CombinedSelectionStrategy(clientAddress));
return new ResourceClient(new CombinedCopycatClient(clientBuilder.build(), serverTransport), registry);
}
Expand All @@ -482,22 +528,13 @@ private ResourceClient buildClient() {
* Builds a server for the replica.
*/
private ResourceServer buildServer() {
// If no transport was configured by the user, attempt to load the Netty transport.
if (serverTransport == null) {
try {
serverTransport = (Transport) Class.forName("io.atomix.catalyst.transport.NettyTransport").newInstance();
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
throw new ConfigurationException("transport not configured");
}
}

// Construct the underlying CopycatServer. The server should have been configured with a CombinedTransport
// that facilitates the local client connecting directly to the server.
if (clientTransport != null) {
serverBuilder.withClientTransport(new CombinedTransport(new LocalTransport(localRegistry), clientTransport))
serverBuilder.withClientTransport(new CombinedServerTransport(new LocalTransport(localRegistry), clientTransport))
.withServerTransport(serverTransport);
} else {
serverBuilder.withTransport(new CombinedTransport(new LocalTransport(localRegistry), serverTransport));
serverBuilder.withTransport(new CombinedServerTransport(new LocalTransport(localRegistry), serverTransport));
}

// Set the server resource state machine.
Expand Down Expand Up @@ -630,26 +667,87 @@ public String toString() {
* Combined server selection strategy.
*/
private static class CombinedSelectionStrategy implements ServerSelectionStrategy {
private final List<Address> addresses;
private final Address address;

private CombinedSelectionStrategy(Address address) {
this.addresses = Collections.singletonList(address);
this.address = address;
}

@Override
public List<Address> selectConnections(Address leader, List<Address> servers) {
List<Address> addresses = new ArrayList<>(servers.size());
addresses.add(address);
Collections.shuffle(servers);
for (Address address : servers) {
if (!address.equals(this.address)) {
addresses.add(address);
}
}
return addresses;
}
}

/**
* Combined client transport.
*/
private static class CombinedClientTransport implements Transport {
private final Address address;
private final Transport local;
private final Transport remote;

private CombinedClientTransport(Address address, Transport local, Transport remote) {
this.address = address;
this.local = local;
this.remote = remote;
}

@Override
public Client client() {
return new CombinedClient(address, local.client(), remote.client());
}

@Override
public Server server() {
return remote.server();
}
}

/**
* Combined client,
*/
private static class CombinedClient implements Client {
private final Address address;
private final Client local;
private final Client remote;

private CombinedClient(Address address, Client local, Client remote) {
this.address = address;
this.local = local;
this.remote = remote;
}

@Override
public CompletableFuture<Connection> connect(Address address) {
if (this.address.equals(address)) {
return local.connect(address);
}
return remote.connect(address);
}

@Override
public CompletableFuture<Void> close() {
return remote.close().thenRun(local::close);
}
}

/**
* Combined transport that aids in the local client communicating directly with the local server.
*/
private static class CombinedTransport implements Transport {
private static class CombinedServerTransport implements Transport {
private final Transport local;
private final Transport remote;

private CombinedTransport(Transport local, Transport remote) {
private CombinedServerTransport(Transport local, Transport remote) {
this.local = local;
this.remote = remote;
}
Expand Down

0 comments on commit c054a8b

Please sign in to comment.