Skip to content

Commit

Permalink
Refactor Copycat client/server usage to use updated connect/close/sta…
Browse files Browse the repository at this point in the history
…rt/stop methods.
  • Loading branch information
kuujo committed Mar 7, 2016
1 parent 67bfe9e commit cf5abb9
Show file tree
Hide file tree
Showing 10 changed files with 119 additions and 212 deletions.
23 changes: 2 additions & 21 deletions core/src/main/java/io/atomix/Atomix.java
Expand Up @@ -17,6 +17,7 @@


import io.atomix.catalyst.serializer.Serializer; import io.atomix.catalyst.serializer.Serializer;
import io.atomix.catalyst.util.Assert; import io.atomix.catalyst.util.Assert;
import io.atomix.catalyst.util.Managed;
import io.atomix.catalyst.util.concurrent.ThreadContext; import io.atomix.catalyst.util.concurrent.ThreadContext;
import io.atomix.collections.DistributedMap; import io.atomix.collections.DistributedMap;
import io.atomix.collections.DistributedMultiMap; import io.atomix.collections.DistributedMultiMap;
Expand Down Expand Up @@ -52,7 +53,7 @@
* *
* @author <a href="http://github.com/kuujo">Jordan Halterman</a> * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/ */
public abstract class Atomix implements ResourceManager<Atomix> { public abstract class Atomix implements ResourceManager<Atomix>, Managed<Atomix> {
final ResourceClient client; final ResourceClient client;


protected Atomix(ResourceClient client) { protected Atomix(ResourceClient client) {
Expand Down Expand Up @@ -1455,26 +1456,6 @@ public <T extends Resource> CompletableFuture<T> getResource(String key, Resourc
return client.getResource(key, type, config, options); return client.getResource(key, type, config, options);
} }


@Override
public CompletableFuture<Atomix> open() {
return client.open().thenApply(v -> this);
}

@Override
public boolean isOpen() {
return client.isOpen();
}

@Override
public CompletableFuture<Void> close() {
return client.close();
}

@Override
public boolean isClosed() {
return client.isClosed();
}

@Override @Override
public String toString() { public String toString() {
return getClass().getSimpleName(); return getClass().getSimpleName();
Expand Down
31 changes: 31 additions & 0 deletions core/src/main/java/io/atomix/AtomixClient.java
Expand Up @@ -20,6 +20,7 @@
import io.atomix.catalyst.transport.Transport; import io.atomix.catalyst.transport.Transport;
import io.atomix.catalyst.util.Assert; import io.atomix.catalyst.util.Assert;
import io.atomix.catalyst.util.PropertiesReader; import io.atomix.catalyst.util.PropertiesReader;
import io.atomix.copycat.client.CopycatClient;
import io.atomix.manager.ResourceClient; import io.atomix.manager.ResourceClient;
import io.atomix.manager.ResourceServer; import io.atomix.manager.ResourceServer;
import io.atomix.resource.Resource; import io.atomix.resource.Resource;
Expand All @@ -29,6 +30,7 @@
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors; import java.util.stream.Collectors;


/** /**
Expand Down Expand Up @@ -155,6 +157,35 @@ public AtomixClient(ResourceClient client) {
super(client); super(client);
} }


/**
* Connects the client to the cluster.
*
* @return A completable future to be completed once the client has been connected.
*/
public CompletableFuture<Atomix> connect() {
return client.connect().thenApply(v -> this);
}

@Override
public CompletableFuture<Atomix> open() {
return connect();
}

@Override
public boolean isOpen() {
return client.state() != CopycatClient.State.CLOSED;
}

@Override
public CompletableFuture<Void> close() {
return client.close();
}

@Override
public boolean isClosed() {
return client.state() == CopycatClient.State.CLOSED;
}

/** /**
* Builder for programmatically constructing an {@link AtomixClient}. * Builder for programmatically constructing an {@link AtomixClient}.
* <p> * <p>
Expand Down
58 changes: 38 additions & 20 deletions core/src/main/java/io/atomix/AtomixReplica.java
Expand Up @@ -406,28 +406,36 @@ private void balance() {
} }
} }


@Override /**
public CompletableFuture<Atomix> open() { * Starts the replica.
return server.open() *
* @return A completable future to be completed once the replica has been started.
*/
public CompletableFuture<Atomix> start() {
return server.start()
.thenRun(this::registerListeners) .thenRun(this::registerListeners)
.thenCompose(v -> super.open()) .thenCompose(v -> client.connect())
.thenCompose(v -> client.getResource("", DistributedLock.class)) .thenCompose(v -> client.getResource("", DistributedLock.class))
.thenApply(lock -> { .thenApply(lock -> {
this.lock = lock; this.lock = lock;
return this; return this;
}); });
} }


@Override /**
public CompletableFuture<Void> close() { * Stops the replica.
*
* @return A completable future to be completed once the replica has been stopped.
*/
public CompletableFuture<Void> stop() {
CompletableFuture<Void> future = new CompletableFuture<>(); CompletableFuture<Void> future = new CompletableFuture<>();
lock.lock() lock.lock()
.thenCompose(v -> balancer.replace(server.server().cluster())) .thenCompose(v -> balancer.replace(server.server().cluster()))
.whenComplete((r1, e1) -> { .whenComplete((r1, e1) -> {
balancer.close(); balancer.close();
lock.unlock().whenComplete((r2, e2) -> { lock.unlock().whenComplete((r2, e2) -> {
super.close().whenComplete((r3, e3) -> { client.close().whenComplete((r3, e3) -> {
server.close().whenComplete((r4, e4) -> { server.stop().whenComplete((r4, e4) -> {
if (e4 == null) { if (e4 == null) {
future.complete(null); future.complete(null);
} else { } else {
Expand All @@ -440,6 +448,26 @@ public CompletableFuture<Void> close() {
return future; return future;
} }


@Override
public CompletableFuture<Atomix> open() {
return start();
}

@Override
public boolean isOpen() {
return server.isRunning() && client.state() != CopycatClient.State.CLOSED;
}

@Override
public CompletableFuture<Void> close() {
return stop();
}

@Override
public boolean isClosed() {
return !server.isRunning() || client.state() == CopycatClient.State.CLOSED;
}

/** /**
* Builder for programmatically constructing an {@link AtomixReplica}. * Builder for programmatically constructing an {@link AtomixReplica}.
* <p> * <p>
Expand Down Expand Up @@ -998,8 +1026,8 @@ public <T> Listener<T> onEvent(String event, Consumer<T> callback) {
} }


@Override @Override
public CompletableFuture<CopycatClient> open() { public CompletableFuture<CopycatClient> connect() {
return client.open(); return client.connect();
} }


@Override @Override
Expand All @@ -1012,16 +1040,6 @@ public CompletableFuture<Void> close() {
return client.close(); return client.close();
} }


@Override
public boolean isOpen() {
return client.isOpen();
}

@Override
public boolean isClosed() {
return client.isClosed();
}

@Override @Override
public String toString() { public String toString() {
return client.toString(); return client.toString();
Expand Down
125 changes: 0 additions & 125 deletions core/src/main/java/io/atomix/util/AtomixCopycatClient.java

This file was deleted.

33 changes: 20 additions & 13 deletions manager/src/main/java/io/atomix/manager/ResourceClient.java
Expand Up @@ -65,7 +65,7 @@
* <p> * <p>
* <b>Client lifecycle</b> * <b>Client lifecycle</b>
* <p> * <p>
* When a client is {@link #open() started}, the client will attempt to contact random servers in the provided * When a client is {@link #connect() connected}, the client will attempt to contact random servers in the provided
* {@link Address} list to open a new session. Opening a client session requires only that the client be able to * {@link Address} list to open a new session. Opening a client session requires only that the client be able to
* communicate with at least one server which can communicate with the leader. Once a session has been opened, * communicate with at least one server which can communicate with the leader. Once a session has been opened,
* the client will periodically send keep-alive requests to the cluster to maintain its session. In the event * the client will periodically send keep-alive requests to the cluster to maintain its session. In the event
Expand Down Expand Up @@ -263,17 +263,29 @@ private synchronized void close(ResourceInstance instance) {
futures.remove(instance.key()); futures.remove(instance.key());
} }


@Override /**
public CompletableFuture<ResourceClient> open() { * Returns the resource client state.
return client.open().thenApply(v -> this); *
* @return The resource client state.
*/
public CopycatClient.State state() {
return client.state();
} }


@Override /**
public boolean isOpen() { * Connects the client to the cluster.
return client.isOpen(); *
* @return A completable future to be completed once the client is connected.
*/
public CompletableFuture<ResourceClient> connect() {
return client.connect().thenApply(v -> this);
} }


@Override /**
* Closes the client.
*
* @return A completable future to be completed once the client has been closed.
*/
public CompletableFuture<Void> close() { public CompletableFuture<Void> close() {
CompletableFuture<?>[] futures = new CompletableFuture[instances.size()]; CompletableFuture<?>[] futures = new CompletableFuture[instances.size()];
int i = 0; int i = 0;
Expand All @@ -283,11 +295,6 @@ public CompletableFuture<Void> close() {
return CompletableFuture.allOf(futures).thenCompose(v -> client.close()); return CompletableFuture.allOf(futures).thenCompose(v -> client.close());
} }


@Override
public boolean isClosed() {
return client.isClosed();
}

@Override @Override
public String toString() { public String toString() {
return String.format("%s[session=%s]", getClass().getSimpleName(), client.session()); return String.format("%s[session=%s]", getClass().getSimpleName(), client.session());
Expand Down
3 changes: 1 addition & 2 deletions manager/src/main/java/io/atomix/manager/ResourceManager.java
Expand Up @@ -16,7 +16,6 @@
package io.atomix.manager; package io.atomix.manager;


import io.atomix.catalyst.serializer.Serializer; import io.atomix.catalyst.serializer.Serializer;
import io.atomix.catalyst.util.Managed;
import io.atomix.catalyst.util.concurrent.ThreadContext; import io.atomix.catalyst.util.concurrent.ThreadContext;
import io.atomix.resource.Resource; import io.atomix.resource.Resource;
import io.atomix.resource.ResourceType; import io.atomix.resource.ResourceType;
Expand All @@ -30,7 +29,7 @@
* @param <T> resource type * @param <T> resource type
* @author <a href="http://github.com/kuujo">Jordan Halterman</a> * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/ */
public interface ResourceManager<T extends ResourceManager<T>> extends Managed<T> { public interface ResourceManager<T extends ResourceManager<T>> {


/** /**
* Returns the Atomix thread context. * Returns the Atomix thread context.
Expand Down

0 comments on commit cf5abb9

Please sign in to comment.