diff --git a/core/src/main/java/io/atomix/Atomix.java b/core/src/main/java/io/atomix/Atomix.java index 82d92a2de8..dab1bea670 100644 --- a/core/src/main/java/io/atomix/Atomix.java +++ b/core/src/main/java/io/atomix/Atomix.java @@ -17,6 +17,7 @@ import io.atomix.catalyst.serializer.Serializer; import io.atomix.catalyst.util.Assert; +import io.atomix.catalyst.util.Managed; import io.atomix.catalyst.util.concurrent.ThreadContext; import io.atomix.collections.DistributedMap; import io.atomix.collections.DistributedMultiMap; @@ -52,7 +53,7 @@ * * @author Jordan Halterman */ -public abstract class Atomix implements ResourceManager { +public abstract class Atomix implements ResourceManager, Managed { final ResourceClient client; protected Atomix(ResourceClient client) { @@ -1455,26 +1456,6 @@ public CompletableFuture getResource(String key, Resourc return client.getResource(key, type, config, options); } - @Override - public CompletableFuture open() { - return client.open().thenApply(v -> this); - } - - @Override - public boolean isOpen() { - return client.isOpen(); - } - - @Override - public CompletableFuture close() { - return client.close(); - } - - @Override - public boolean isClosed() { - return client.isClosed(); - } - @Override public String toString() { return getClass().getSimpleName(); diff --git a/core/src/main/java/io/atomix/AtomixClient.java b/core/src/main/java/io/atomix/AtomixClient.java index 050c5f3c38..1f2f0fa04b 100644 --- a/core/src/main/java/io/atomix/AtomixClient.java +++ b/core/src/main/java/io/atomix/AtomixClient.java @@ -20,6 +20,7 @@ import io.atomix.catalyst.transport.Transport; import io.atomix.catalyst.util.Assert; import io.atomix.catalyst.util.PropertiesReader; +import io.atomix.copycat.client.CopycatClient; import io.atomix.manager.ResourceClient; import io.atomix.manager.ResourceServer; import io.atomix.resource.Resource; @@ -29,6 +30,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Properties; +import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; /** @@ -155,6 +157,35 @@ public AtomixClient(ResourceClient client) { super(client); } + /** + * Connects the client to the cluster. + * + * @return A completable future to be completed once the client has been connected. + */ + public CompletableFuture connect() { + return client.connect().thenApply(v -> this); + } + + @Override + public CompletableFuture open() { + return connect(); + } + + @Override + public boolean isOpen() { + return client.state() != CopycatClient.State.CLOSED; + } + + @Override + public CompletableFuture close() { + return client.close(); + } + + @Override + public boolean isClosed() { + return client.state() == CopycatClient.State.CLOSED; + } + /** * Builder for programmatically constructing an {@link AtomixClient}. *

diff --git a/core/src/main/java/io/atomix/AtomixReplica.java b/core/src/main/java/io/atomix/AtomixReplica.java index 3f3057d2fd..b88611bf0a 100644 --- a/core/src/main/java/io/atomix/AtomixReplica.java +++ b/core/src/main/java/io/atomix/AtomixReplica.java @@ -406,11 +406,15 @@ private void balance() { } } - @Override - public CompletableFuture open() { - return server.open() + /** + * Starts the replica. + * + * @return A completable future to be completed once the replica has been started. + */ + public CompletableFuture start() { + return server.start() .thenRun(this::registerListeners) - .thenCompose(v -> super.open()) + .thenCompose(v -> client.connect()) .thenCompose(v -> client.getResource("", DistributedLock.class)) .thenApply(lock -> { this.lock = lock; @@ -418,16 +422,20 @@ public CompletableFuture open() { }); } - @Override - public CompletableFuture close() { + /** + * Stops the replica. + * + * @return A completable future to be completed once the replica has been stopped. + */ + public CompletableFuture stop() { CompletableFuture future = new CompletableFuture<>(); lock.lock() .thenCompose(v -> balancer.replace(server.server().cluster())) .whenComplete((r1, e1) -> { balancer.close(); lock.unlock().whenComplete((r2, e2) -> { - super.close().whenComplete((r3, e3) -> { - server.close().whenComplete((r4, e4) -> { + client.close().whenComplete((r3, e3) -> { + server.stop().whenComplete((r4, e4) -> { if (e4 == null) { future.complete(null); } else { @@ -440,6 +448,26 @@ public CompletableFuture close() { return future; } + @Override + public CompletableFuture open() { + return start(); + } + + @Override + public boolean isOpen() { + return server.isRunning() && client.state() != CopycatClient.State.CLOSED; + } + + @Override + public CompletableFuture close() { + return stop(); + } + + @Override + public boolean isClosed() { + return !server.isRunning() || client.state() == CopycatClient.State.CLOSED; + } + /** * Builder for programmatically constructing an {@link AtomixReplica}. *

@@ -998,8 +1026,8 @@ public Listener onEvent(String event, Consumer callback) { } @Override - public CompletableFuture open() { - return client.open(); + public CompletableFuture connect() { + return client.connect(); } @Override @@ -1012,16 +1040,6 @@ public CompletableFuture close() { return client.close(); } - @Override - public boolean isOpen() { - return client.isOpen(); - } - - @Override - public boolean isClosed() { - return client.isClosed(); - } - @Override public String toString() { return client.toString(); diff --git a/core/src/main/java/io/atomix/util/AtomixCopycatClient.java b/core/src/main/java/io/atomix/util/AtomixCopycatClient.java deleted file mode 100644 index 74bc12d3aa..0000000000 --- a/core/src/main/java/io/atomix/util/AtomixCopycatClient.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Copyright 2015 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License - */ -package io.atomix.util; - -import io.atomix.catalyst.serializer.Serializer; -import io.atomix.catalyst.transport.Transport; -import io.atomix.catalyst.util.Assert; -import io.atomix.catalyst.util.Listener; -import io.atomix.catalyst.util.concurrent.ThreadContext; -import io.atomix.copycat.Command; -import io.atomix.copycat.Query; -import io.atomix.copycat.client.CopycatClient; -import io.atomix.copycat.session.Session; - -import java.util.concurrent.CompletableFuture; -import java.util.function.Consumer; - -/** - * A simple {@link CopycatClient} wrapper that exposes a custom {@link Transport}. - * - * @author onStateChange(Consumer consumer) { - return client.onStateChange(consumer); - } - - @Override - public ThreadContext context() { - return client.context(); - } - - @Override - public Transport transport() { - return transport; - } - - @Override - public Serializer serializer() { - return client.serializer(); - } - - @Override - public Session session() { - return client.session(); - } - - @Override - public CompletableFuture submit(Command command) { - return client.submit(command); - } - - @Override - public CompletableFuture submit(Query query) { - return client.submit(query); - } - - @Override - public Listener onEvent(String event, Runnable callback) { - return client.onEvent(event, callback); - } - - @Override - public Listener onEvent(String event, Consumer callback) { - return client.onEvent(event, callback); - } - - @Override - public CompletableFuture open() { - return client.open(); - } - - @Override - public CompletableFuture recover() { - return client.recover(); - } - - @Override - public CompletableFuture close() { - return client.close(); - } - - @Override - public boolean isOpen() { - return client.isOpen(); - } - - @Override - public boolean isClosed() { - return client.isClosed(); - } - - @Override - public String toString() { - return client.toString(); - } - -} diff --git a/manager/src/main/java/io/atomix/manager/ResourceClient.java b/manager/src/main/java/io/atomix/manager/ResourceClient.java index b1bb01c805..60dc476cc2 100644 --- a/manager/src/main/java/io/atomix/manager/ResourceClient.java +++ b/manager/src/main/java/io/atomix/manager/ResourceClient.java @@ -65,7 +65,7 @@ *

* Client lifecycle *

- * When a client is {@link #open() started}, the client will attempt to contact random servers in the provided + * When a client is {@link #connect() connected}, the client will attempt to contact random servers in the provided * {@link Address} list to open a new session. Opening a client session requires only that the client be able to * communicate with at least one server which can communicate with the leader. Once a session has been opened, * the client will periodically send keep-alive requests to the cluster to maintain its session. In the event @@ -263,17 +263,29 @@ private synchronized void close(ResourceInstance instance) { futures.remove(instance.key()); } - @Override - public CompletableFuture open() { - return client.open().thenApply(v -> this); + /** + * Returns the resource client state. + * + * @return The resource client state. + */ + public CopycatClient.State state() { + return client.state(); } - @Override - public boolean isOpen() { - return client.isOpen(); + /** + * Connects the client to the cluster. + * + * @return A completable future to be completed once the client is connected. + */ + public CompletableFuture connect() { + return client.connect().thenApply(v -> this); } - @Override + /** + * Closes the client. + * + * @return A completable future to be completed once the client has been closed. + */ public CompletableFuture close() { CompletableFuture[] futures = new CompletableFuture[instances.size()]; int i = 0; @@ -283,11 +295,6 @@ public CompletableFuture close() { return CompletableFuture.allOf(futures).thenCompose(v -> client.close()); } - @Override - public boolean isClosed() { - return client.isClosed(); - } - @Override public String toString() { return String.format("%s[session=%s]", getClass().getSimpleName(), client.session()); diff --git a/manager/src/main/java/io/atomix/manager/ResourceManager.java b/manager/src/main/java/io/atomix/manager/ResourceManager.java index 3af2e45b5c..94f3742afe 100644 --- a/manager/src/main/java/io/atomix/manager/ResourceManager.java +++ b/manager/src/main/java/io/atomix/manager/ResourceManager.java @@ -16,7 +16,6 @@ package io.atomix.manager; import io.atomix.catalyst.serializer.Serializer; -import io.atomix.catalyst.util.Managed; import io.atomix.catalyst.util.concurrent.ThreadContext; import io.atomix.resource.Resource; import io.atomix.resource.ResourceType; @@ -30,7 +29,7 @@ * @param resource type * @author Jordan Halterman */ -public interface ResourceManager> extends Managed { +public interface ResourceManager> { /** * Returns the Atomix thread context. diff --git a/manager/src/main/java/io/atomix/manager/ResourceServer.java b/manager/src/main/java/io/atomix/manager/ResourceServer.java index 41bec60f32..ded0388fa3 100644 --- a/manager/src/main/java/io/atomix/manager/ResourceServer.java +++ b/manager/src/main/java/io/atomix/manager/ResourceServer.java @@ -20,7 +20,6 @@ import io.atomix.catalyst.transport.Transport; import io.atomix.catalyst.util.Assert; import io.atomix.catalyst.util.ConfigurationException; -import io.atomix.catalyst.util.Managed; import io.atomix.catalyst.util.concurrent.ThreadContext; import io.atomix.copycat.server.CopycatServer; import io.atomix.copycat.server.storage.Storage; @@ -66,7 +65,7 @@ *

* Server lifecycle *

- * When the server is {@link #open() started}, the server will attempt to contact members in the configured + * When the server is {@link #start() started}, the server will attempt to contact members in the configured * startup {@link Address} list. If any of the members are already in an active state, the server will request * to join the cluster. During the process of joining the cluster, the server will notify the current cluster * leader of its existence. If the leader already knows about the joining server, the server will immediately @@ -78,7 +77,7 @@ * * @author recover() { if (state != State.SUSPENDED) @@ -245,11 +240,6 @@ public synchronized CompletableFuture close() { return closeFuture; } - @Override - public boolean isClosed() { - return client.isClosed(); - } - @Override public String toString() { return String.format("%s[resource=%d]", getClass().getSimpleName(), resource); diff --git a/resource/src/main/java/io/atomix/resource/AbstractResource.java b/resource/src/main/java/io/atomix/resource/AbstractResource.java index d00baf13f8..6fc516b17d 100644 --- a/resource/src/main/java/io/atomix/resource/AbstractResource.java +++ b/resource/src/main/java/io/atomix/resource/AbstractResource.java @@ -285,7 +285,7 @@ protected CompletableFuture submit(Query query, ReadConsistency consis @Override @SuppressWarnings("unchecked") public CompletableFuture open() { - return client.open() + return client.connect() .thenCompose(v -> client.submit(new ResourceCommand.Config())) .thenApply(config -> { this.config = new Config(config); diff --git a/testing/src/main/java/io/atomix/testing/AbstractCopycatTest.java b/testing/src/main/java/io/atomix/testing/AbstractCopycatTest.java index bf0f287169..4c13b9b704 100644 --- a/testing/src/main/java/io/atomix/testing/AbstractCopycatTest.java +++ b/testing/src/main/java/io/atomix/testing/AbstractCopycatTest.java @@ -71,7 +71,7 @@ protected void cleanup() { }); servers.stream().forEach(s -> { try { - s.close().join(); + s.stop().join(); } catch (Exception ignore) { } }); @@ -182,7 +182,7 @@ protected List createServers(int live, int total, Resource.Config List servers = new ArrayList<>(); for (int i = 0; i < live; i++) { CopycatServer server = createServer(members.get(i), config); - server.open().thenRun(this::resume); + server.start().thenRun(this::resume); servers.add(server); }