Skip to content

Commit

Permalink
Update usage of CopycatClient interface.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Dec 30, 2015
1 parent 41369eb commit f5d83a9
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 96 deletions.
5 changes: 3 additions & 2 deletions manager/src/main/java/io/atomix/Atomix.java
Expand Up @@ -430,9 +430,10 @@ public static abstract class Builder extends io.atomix.catalyst.util.Builder<Ato


protected Builder(Collection<Address> members) { protected Builder(Collection<Address> members) {
clientBuilder = CopycatClient.builder(members) clientBuilder = CopycatClient.builder(members)
.withConnectionStrategy(ConnectionStrategies.BACKOFF) .withServerSelectionStrategy(ServerSelectionStrategies.ANY)
.withConnectionStrategy(ConnectionStrategies.FIBONACCI_BACKOFF)
.withRecoveryStrategy(RecoveryStrategies.RECOVER) .withRecoveryStrategy(RecoveryStrategies.RECOVER)
.withSubmissionStrategy(SubmissionStrategies.ANY); .withRetryStrategy(RetryStrategies.FIBONACCI_BACKOFF);
} }


/** /**
Expand Down
24 changes: 13 additions & 11 deletions manager/src/main/java/io/atomix/AtomixReplica.java
Expand Up @@ -20,14 +20,17 @@
import io.atomix.catalyst.util.Assert; import io.atomix.catalyst.util.Assert;
import io.atomix.catalyst.util.ConfigurationException; import io.atomix.catalyst.util.ConfigurationException;
import io.atomix.copycat.client.CopycatClient; import io.atomix.copycat.client.CopycatClient;
import io.atomix.copycat.client.SubmissionStrategy; import io.atomix.copycat.client.ServerSelectionStrategy;
import io.atomix.copycat.server.CopycatServer; import io.atomix.copycat.server.CopycatServer;
import io.atomix.copycat.server.storage.Storage; import io.atomix.copycat.server.storage.Storage;
import io.atomix.manager.ResourceManager; import io.atomix.manager.ResourceManager;
import io.atomix.resource.ResourceRegistry; import io.atomix.resource.ResourceRegistry;


import java.time.Duration; import java.time.Duration;
import java.util.*; import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer; import java.util.function.Consumer;


Expand Down Expand Up @@ -153,19 +156,17 @@ public CompletableFuture<Void> close() {
} }


/** /**
* Combined submission strategy. * Combined server selection strategy.
*/ */
private static class CombinedSubmissionStrategy implements SubmissionStrategy { private static class CombinedSelectionStrategy implements ServerSelectionStrategy {
private final Address address; private final Collection<Address> addresses;


private CombinedSubmissionStrategy(Address address) { private CombinedSelectionStrategy(Address address) {
this.address = address; this.addresses = Collections.singleton(address);
} }


@Override @Override
public List<Address> getConnections(Address leader, List<Address> servers) { public Collection<Address> selectConnections(Address leader, List<Address> servers) {
List<Address> addresses = new ArrayList<>(1);
addresses.add(address);
return addresses; return addresses;
} }
} }
Expand Down Expand Up @@ -433,7 +434,8 @@ public AtomixReplica build() {
// directly through the local server, ensuring we don't incur unnecessary network traffic by // directly through the local server, ensuring we don't incur unnecessary network traffic by
// sending operations to a remote server when a local server is already available in the same JVM.= // sending operations to a remote server when a local server is already available in the same JVM.=
clientBuilder.withTransport(new LocalTransport(localRegistry)) clientBuilder.withTransport(new LocalTransport(localRegistry))
.withSubmissionStrategy(new CombinedSubmissionStrategy(clientAddress)).build(); .withServerSelectionStrategy(new CombinedSelectionStrategy(clientAddress))
.build();


// Construct the underlying CopycatServer. The server should have been configured with a CombinedTransport // Construct the underlying CopycatServer. The server should have been configured with a CombinedTransport
// that facilitates the local client connecting directly to the server. // that facilitates the local client connecting directly to the server.
Expand Down
73 changes: 11 additions & 62 deletions manager/src/main/java/io/atomix/manager/ManagedResourceSession.java
Expand Up @@ -15,7 +15,6 @@
*/ */
package io.atomix.manager; package io.atomix.manager;


import io.atomix.catalyst.util.Assert;
import io.atomix.catalyst.util.Listener; import io.atomix.catalyst.util.Listener;
import io.atomix.catalyst.util.Listeners; import io.atomix.catalyst.util.Listeners;
import io.atomix.copycat.client.session.Session; import io.atomix.copycat.client.session.Session;
Expand All @@ -30,36 +29,31 @@
* *
* @author <a href="http://github.com/kuujo">Jordan Halterman</a> * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/ */
class ManagedResourceSession implements Session { final class ManagedResourceSession implements Session {

/**
* Resource session state.
*/
private enum State {
OPEN,
CLOSED,
EXPIRED
}

private State state = State.OPEN;
private final long resource; private final long resource;
private final Session parent; private final Session parent;
private final Listeners<Session> openListeners = new Listeners<>();
private final Listeners<Session> closeListeners = new Listeners<>();
private final Map<String, Listeners<Object>> eventListeners = new ConcurrentHashMap<>(); private final Map<String, Listeners<Object>> eventListeners = new ConcurrentHashMap<>();


public ManagedResourceSession(long resource, Session parent) { public ManagedResourceSession(long resource, Session parent) {
this.resource = resource; this.resource = resource;
this.parent = parent; this.parent = parent;
parent.onOpen(this::handleOpen);
parent.onClose(this::handleClose);
} }


@Override @Override
public long id() { public long id() {
return resource; return resource;
} }


@Override
public State state() {
return parent.state();
}

@Override
public Listener<State> onStateChange(Consumer<State> callback) {
return parent.onStateChange(callback);
}

@Override @Override
public Session publish(String event) { public Session publish(String event) {
return parent.publish(event, new InstanceEvent<>(resource, null)); return parent.publish(event, new InstanceEvent<>(resource, null));
Expand Down Expand Up @@ -103,49 +97,4 @@ public synchronized Listener onEvent(String event, Consumer listener) {
return listeners.add(listener); return listeners.add(listener);
} }


@Override
public boolean isOpen() {
return state == State.OPEN;
}

/**
* Handles a session open event.
*/
private void handleOpen(Session session) {
state = State.OPEN;
for (Consumer<Session> listener : openListeners) {
listener.accept(this);
}
}

@Override
public Listener<Session> onOpen(Consumer<Session> listener) {
return openListeners.add(Assert.notNull(listener, "listener"));
}

/**
* Handles a session close event.
*/
private void handleClose(Session session) {
state = session.isExpired() ? State.EXPIRED : State.CLOSED;
for (Consumer<Session> listener : closeListeners) {
listener.accept(this);
}
}

@Override
public Listener<Session> onClose(Consumer<Session> listener) {
return closeListeners.add(Assert.notNull(listener, "listener"));
}

@Override
public boolean isClosed() {
return state == State.CLOSED || state == State.EXPIRED;
}

@Override
public boolean isExpired() {
return state == State.EXPIRED;
}

} }
98 changes: 97 additions & 1 deletion manager/src/main/java/io/atomix/resource/InstanceClient.java
Expand Up @@ -18,14 +18,20 @@
import io.atomix.catalyst.serializer.Serializer; import io.atomix.catalyst.serializer.Serializer;
import io.atomix.catalyst.transport.Transport; import io.atomix.catalyst.transport.Transport;
import io.atomix.catalyst.util.Assert; import io.atomix.catalyst.util.Assert;
import io.atomix.catalyst.util.Listener;
import io.atomix.catalyst.util.concurrent.ThreadContext; import io.atomix.catalyst.util.concurrent.ThreadContext;
import io.atomix.copycat.client.Command; import io.atomix.copycat.client.Command;
import io.atomix.copycat.client.Query;
import io.atomix.copycat.client.CopycatClient; import io.atomix.copycat.client.CopycatClient;
import io.atomix.copycat.client.Query;
import io.atomix.copycat.client.session.Session; import io.atomix.copycat.client.session.Session;
import io.atomix.manager.DeleteResource; import io.atomix.manager.DeleteResource;


import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;


/** /**
* Resource context. * Resource context.
Expand All @@ -37,6 +43,8 @@ public class InstanceClient implements CopycatClient {
private final CopycatClient client; private final CopycatClient client;
private final Transport transport; private final Transport transport;
private final InstanceSession session; private final InstanceSession session;
private final Map<String, Set<EventListener>> eventListeners = new ConcurrentHashMap<>();
private final Map<String, Listener<InstanceEvent<?>>> listeners = new ConcurrentHashMap<>();


/** /**
* @throws NullPointerException if {@code client} is null * @throws NullPointerException if {@code client} is null
Expand All @@ -48,6 +56,16 @@ public InstanceClient(long resource, CopycatClient client, Transport transport)
this.session = new InstanceSession(resource, client.session(), client.context()); this.session = new InstanceSession(resource, client.session(), client.context());
} }


@Override
public State state() {
return client.state();
}

@Override
public Listener<State> onStateChange(Consumer<State> callback) {
return client.onStateChange(callback);
}

@Override @Override
public ThreadContext context() { public ThreadContext context() {
return client.context(); return client.context();
Expand Down Expand Up @@ -81,6 +99,44 @@ public <T> CompletableFuture<T> submit(Query<T> query) {
return client.submit(new InstanceQuery<>(resource, query)); return client.submit(new InstanceQuery<>(resource, query));
} }


@Override
public Listener<Void> onEvent(String event, Runnable callback) {
return onEvent(event, v -> callback.run());
}

@Override
@SuppressWarnings("unchecked")
public synchronized <T> Listener<T> onEvent(String event, Consumer<T> listener) {
Assert.notNull(event, "event");
Assert.notNull(listener, "listener");

Set<EventListener> listeners = eventListeners.get(event);
if (listeners == null) {
listeners = new HashSet<>();
eventListeners.put(event, listeners);
this.listeners.put(event, client.onEvent(event, message -> handleEvent(event, message)));
}

EventListener context = new EventListener(event, listener);
listeners.add(context);
return context;
}

/**
* Handles receiving a resource message.
*/
@SuppressWarnings("unchecked")
private void handleEvent(String event, InstanceEvent<?> message) {
if (message.resource() == resource) {
Set<EventListener> listeners = eventListeners.get(event);
if (listeners != null) {
for (EventListener listener : listeners) {
listener.accept(message.message());
}
}
}
}

@Override @Override
public CompletableFuture<CopycatClient> open() { public CompletableFuture<CopycatClient> open() {
return CompletableFuture.completedFuture(this); return CompletableFuture.completedFuture(this);
Expand All @@ -91,6 +147,11 @@ public boolean isOpen() {
return client.isOpen(); return client.isOpen();
} }


@Override
public CompletableFuture<CopycatClient> recover() {
return CompletableFuture.completedFuture(this);
}

@Override @Override
public CompletableFuture<Void> close() { public CompletableFuture<Void> close() {
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
Expand All @@ -106,4 +167,39 @@ public String toString() {
return String.format("%s[resource=%d]", getClass().getSimpleName(), resource); return String.format("%s[resource=%d]", getClass().getSimpleName(), resource);
} }


/**
* Receive listener context.
*/
private class EventListener<T> implements Listener<T> {
private final String event;
private final Consumer<T> listener;

private EventListener(String event, Consumer<T> listener) {
this.event = event;
this.listener = listener;
}

@Override
public void accept(T event) {
listener.accept(event);
}

@Override
public void close() {
synchronized (InstanceClient.this) {
Set<EventListener> listeners = eventListeners.get(event);
if (listeners != null) {
listeners.remove(this);
if (listeners.isEmpty()) {
eventListeners.remove(event);
Listener listener = InstanceClient.this.listeners.remove(event);
if (listener != null) {
listener.close();
}
}
}
}
}
}

} }
23 changes: 4 additions & 19 deletions manager/src/main/java/io/atomix/resource/InstanceSession.java
Expand Up @@ -53,13 +53,13 @@ public long id() {
} }


@Override @Override
public boolean isOpen() { public State state() {
return parent.isOpen(); return parent.state();
} }


@Override @Override
public Listener<Session> onOpen(Consumer<Session> listener) { public Listener<State> onStateChange(Consumer<State> callback) {
return parent.onOpen(Assert.notNull(listener, "listener")); return parent.onStateChange(callback);
} }


@Override @Override
Expand Down Expand Up @@ -119,21 +119,6 @@ private void handleEvent(String event, InstanceEvent<?> message) {
} }
} }


@Override
public Listener<Session> onClose(Consumer<Session> listener) {
return parent.onClose(Assert.notNull(listener, "listener"));
}

@Override
public boolean isClosed() {
return parent.isClosed();
}

@Override
public boolean isExpired() {
return parent.isExpired();
}

@Override @Override
public int hashCode() { public int hashCode() {
return parent.hashCode(); return parent.hashCode();
Expand Down
Expand Up @@ -85,7 +85,7 @@ public void publish(Commit<TopicCommands.Publish> commit) {
Iterator<Map.Entry<Long, Commit<TopicCommands.Listen>>> iterator = listeners.entrySet().iterator(); Iterator<Map.Entry<Long, Commit<TopicCommands.Listen>>> iterator = listeners.entrySet().iterator();
while (iterator.hasNext()) { while (iterator.hasNext()) {
Commit<TopicCommands.Listen> listener = iterator.next().getValue(); Commit<TopicCommands.Listen> listener = iterator.next().getValue();
if (listener.session().isOpen()) { if (listener.session().state() == Session.State.OPEN) {
listener.session().publish("message", commit.operation().message()); listener.session().publish("message", commit.operation().message());
} else { } else {
iterator.remove(); iterator.remove();
Expand Down

0 comments on commit f5d83a9

Please sign in to comment.