Skip to content

Commit

Permalink
Use a DistributedLock internally to coordinate balancing the cluster.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Feb 6, 2016
1 parent 999aa9d commit 84f8737
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 26 deletions.
141 changes: 117 additions & 24 deletions core/src/main/java/io/atomix/AtomixReplica.java
Expand Up @@ -23,6 +23,7 @@
import io.atomix.catalyst.util.Listener; import io.atomix.catalyst.util.Listener;
import io.atomix.catalyst.util.PropertiesReader; import io.atomix.catalyst.util.PropertiesReader;
import io.atomix.catalyst.util.concurrent.ThreadContext; import io.atomix.catalyst.util.concurrent.ThreadContext;
import io.atomix.coordination.DistributedLock;
import io.atomix.copycat.client.Command; import io.atomix.copycat.client.Command;
import io.atomix.copycat.client.CopycatClient; import io.atomix.copycat.client.CopycatClient;
import io.atomix.copycat.client.Query; import io.atomix.copycat.client.Query;
Expand Down Expand Up @@ -186,6 +187,8 @@ public static Builder builder(Address clientAddress, Address serverAddress, Coll


private final ResourceServer server; private final ResourceServer server;
private final ClusterBalancer balancer; private final ClusterBalancer balancer;
private DistributedLock lock;
private boolean locking;


public AtomixReplica(Properties properties) { public AtomixReplica(Properties properties) {
this(builder(properties)); this(builder(properties));
Expand Down Expand Up @@ -225,24 +228,46 @@ private void registerListeners() {
* Balances the cluster. * Balances the cluster.
*/ */
private void balance() { private void balance() {
if (server.server().cluster().member().equals(server.server().cluster().leader())) { if (!locking && server.server().cluster().member().equals(server.server().cluster().leader())) {
balancer.balance(server.server().cluster()); locking = true;
lock.lock()
.thenCompose(v -> balancer.balance(server.server().cluster()))
.whenComplete((r1, e1) -> lock.unlock().whenComplete((r2, e2) -> locking = false));
} }
} }


@Override @Override
public CompletableFuture<Atomix> open() { public CompletableFuture<Atomix> open() {
return server.open() return server.open()
.thenRun(this::registerListeners) .thenRun(this::registerListeners)
.thenCompose(v -> super.open()); .thenCompose(v -> super.open())
.thenCompose(v -> client.get("", DistributedLock.class))
.thenApply(lock -> {
this.lock = lock;
return this;
});
} }


@Override @Override
public CompletableFuture<Void> close() { public CompletableFuture<Void> close() {
return balancer.replace(server.server().cluster()) CompletableFuture<Void> future = new CompletableFuture<>();
.whenComplete((r, e) -> balancer.close()) lock.lock()
.thenCompose(v -> super.close()) .thenCompose(v -> balancer.replace(server.server().cluster()))
.thenCompose(v -> server.close()); .whenComplete((r1, e1) -> {
balancer.close();
lock.close().whenComplete((r2, e2) -> {
super.close().whenComplete((r3, e3) -> {
server.close().whenComplete((r4, e4) -> {
if (e4 == null) {
future.complete(null);
} else {
future.completeExceptionally(e4);
}
});
});
});
});
return future;
} }


/** /**
Expand Down Expand Up @@ -284,7 +309,7 @@ public static class Builder extends io.atomix.catalyst.util.Builder<AtomixReplic
private Builder(Address clientAddress, Address serverAddress, Collection<Address> members) { private Builder(Address clientAddress, Address serverAddress, Collection<Address> members) {
Serializer serializer = new Serializer(new ServiceLoaderTypeResolver()); Serializer serializer = new Serializer(new ServiceLoaderTypeResolver());
this.clientAddress = Assert.notNull(clientAddress, "clientAddress"); this.clientAddress = Assert.notNull(clientAddress, "clientAddress");
this.clientBuilder = CopycatClient.builder(Collections.singleton(clientAddress)).withSerializer(serializer.clone()); this.clientBuilder = CopycatClient.builder(members).withSerializer(serializer.clone());
this.serverBuilder = CopycatServer.builder(clientAddress, serverAddress, members).withSerializer(serializer.clone()); this.serverBuilder = CopycatServer.builder(clientAddress, serverAddress, members).withSerializer(serializer.clone());
this.quorumHint = members.size(); this.quorumHint = members.size();
} }
Expand Down Expand Up @@ -468,17 +493,33 @@ public Builder withSessionTimeout(Duration sessionTimeout) {
return this; return this;
} }


/**
* Builds the replica transports.
*/
private void buildTransport() {
// If no transport was configured by the user, attempt to load the Netty transport.
if (serverTransport == null) {
try {
serverTransport = (Transport) Class.forName("io.atomix.catalyst.transport.NettyTransport").newInstance();
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
throw new ConfigurationException("transport not configured");
}
}
}

/** /**
* Builds a client for the replica. * Builds a client for the replica.
*/ */
private ResourceClient buildClient() { private ResourceClient buildClient() {
buildTransport();

// Resolve resources. // Resolve resources.
resourceResolver.resolve(registry); resourceResolver.resolve(registry);


// Configure the client and server with a transport that routes all local client communication // Configure the client and server with a transport that routes all local client communication
// directly through the local server, ensuring we don't incur unnecessary network traffic by // directly through the local server, ensuring we don't incur unnecessary network traffic by
// sending operations to a remote server when a local server is already available in the same JVM.= // sending operations to a remote server when a local server is already available in the same JVM.=
clientBuilder.withTransport(new LocalTransport(localRegistry)) clientBuilder.withTransport(new CombinedClientTransport(clientAddress, new LocalTransport(localRegistry), clientTransport != null ? clientTransport : serverTransport))
.withServerSelectionStrategy(new CombinedSelectionStrategy(clientAddress)); .withServerSelectionStrategy(new CombinedSelectionStrategy(clientAddress));
return new ResourceClient(new CombinedCopycatClient(clientBuilder.build(), serverTransport), registry); return new ResourceClient(new CombinedCopycatClient(clientBuilder.build(), serverTransport), registry);
} }
Expand All @@ -487,22 +528,13 @@ private ResourceClient buildClient() {
* Builds a server for the replica. * Builds a server for the replica.
*/ */
private ResourceServer buildServer() { private ResourceServer buildServer() {
// If no transport was configured by the user, attempt to load the Netty transport.
if (serverTransport == null) {
try {
serverTransport = (Transport) Class.forName("io.atomix.catalyst.transport.NettyTransport").newInstance();
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
throw new ConfigurationException("transport not configured");
}
}

// Construct the underlying CopycatServer. The server should have been configured with a CombinedTransport // Construct the underlying CopycatServer. The server should have been configured with a CombinedTransport
// that facilitates the local client connecting directly to the server. // that facilitates the local client connecting directly to the server.
if (clientTransport != null) { if (clientTransport != null) {
serverBuilder.withClientTransport(new CombinedTransport(new LocalTransport(localRegistry), clientTransport)) serverBuilder.withClientTransport(new CombinedServerTransport(new LocalTransport(localRegistry), clientTransport))
.withServerTransport(serverTransport); .withServerTransport(serverTransport);
} else { } else {
serverBuilder.withTransport(new CombinedTransport(new LocalTransport(localRegistry), serverTransport)); serverBuilder.withTransport(new CombinedServerTransport(new LocalTransport(localRegistry), serverTransport));
} }


// Set the server resource state machine. // Set the server resource state machine.
Expand Down Expand Up @@ -635,26 +667,87 @@ public String toString() {
* Combined server selection strategy. * Combined server selection strategy.
*/ */
private static class CombinedSelectionStrategy implements ServerSelectionStrategy { private static class CombinedSelectionStrategy implements ServerSelectionStrategy {
private final List<Address> addresses; private final Address address;


private CombinedSelectionStrategy(Address address) { private CombinedSelectionStrategy(Address address) {
this.addresses = Collections.singletonList(address); this.address = address;
} }


@Override @Override
public List<Address> selectConnections(Address leader, List<Address> servers) { public List<Address> selectConnections(Address leader, List<Address> servers) {
List<Address> addresses = new ArrayList<>(servers.size());
addresses.add(address);
Collections.shuffle(servers);
for (Address address : servers) {
if (!address.equals(this.address)) {
addresses.add(address);
}
}
return addresses; return addresses;
} }
} }


/**
* Combined client transport.
*/
private static class CombinedClientTransport implements Transport {
private final Address address;
private final Transport local;
private final Transport remote;

private CombinedClientTransport(Address address, Transport local, Transport remote) {
this.address = address;
this.local = local;
this.remote = remote;
}

@Override
public Client client() {
return new CombinedClient(address, local.client(), remote.client());
}

@Override
public Server server() {
return remote.server();
}
}

/**
* Combined client,
*/
private static class CombinedClient implements Client {
private final Address address;
private final Client local;
private final Client remote;

private CombinedClient(Address address, Client local, Client remote) {
this.address = address;
this.local = local;
this.remote = remote;
}

@Override
public CompletableFuture<Connection> connect(Address address) {
if (this.address.equals(address)) {
return local.connect(address);
}
return remote.connect(address);
}

@Override
public CompletableFuture<Void> close() {
return remote.close().thenRun(local::close);
}
}

/** /**
* Combined transport that aids in the local client communicating directly with the local server. * Combined transport that aids in the local client communicating directly with the local server.
*/ */
private static class CombinedTransport implements Transport { private static class CombinedServerTransport implements Transport {
private final Transport local; private final Transport local;
private final Transport remote; private final Transport remote;


private CombinedTransport(Transport local, Transport remote) { private CombinedServerTransport(Transport local, Transport remote) {
this.local = local; this.local = local;
this.remote = remote; this.remote = remote;
} }
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/java/io/atomix/util/ClusterBalancer.java
Expand Up @@ -181,8 +181,14 @@ private CompletableFuture<Void> replace(Cluster cluster, CompletableFuture<Void>
}; };


Function<Void, CompletableFuture<Void>> demoteFunction = v -> { Function<Void, CompletableFuture<Void>> demoteFunction = v -> {
LOGGER.info("Demoting {} to RESERVE", cluster.member().address()); long passiveCount = cluster.members().stream().filter(m -> m.type() == Member.Type.PASSIVE).count();
return cluster.member().demote(Member.Type.RESERVE); if (passiveCount < quorumHint * backupCount) {
LOGGER.info("Demoting {} to PASSIVE", cluster.member().address());
return cluster.member().demote(Member.Type.PASSIVE);
} else {
LOGGER.info("Demoting {} to RESERVE", cluster.member().address());
return cluster.member().demote(Member.Type.RESERVE);
}
}; };


// If the local member is active, replace it with a passive or reserve member. // If the local member is active, replace it with a passive or reserve member.
Expand Down
20 changes: 20 additions & 0 deletions core/src/test/java/io/atomix/AtomixReplicaTest.java
Expand Up @@ -138,6 +138,26 @@ private void testSubmitQuery(Consistency consistency) throws Throwable {
await(10000); await(10000);
} }


/**
* Tests submitting a command through all nodes.
*/
public void testSubmitAll() throws Throwable {
List<Atomix> replicas = createReplicas(8, 3, 1);

for (Atomix replica : replicas) {
ValueResource resource = replica.get("test", ValueResource.class).get();
resource.set("Hello world!").thenRun(this::resume);
await(10000);
}

ValueResource resource = replicas.get(0).get("test", ValueResource.class).get();
resource.get().thenAccept(result -> {
threadAssertEquals("Hello world!", result);
resume();
});
await(10000);
}

/** /**
* Tests getting a resource and submitting commands. * Tests getting a resource and submitting commands.
*/ */
Expand Down

0 comments on commit 84f8737

Please sign in to comment.