Skip to content

Commit

Permalink
Recursively attempt to reconnect to servers after failures.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Jan 24, 2015
1 parent f7e5485 commit 8d3aec3
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 16 deletions.
2 changes: 1 addition & 1 deletion core/src/main/java/net/kuujo/copycat/Task.java
Expand Up @@ -17,7 +17,7 @@
import java.io.Serializable;

/**
* Remote task.
* Remote executable task.
*
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
Expand Down
Expand Up @@ -43,7 +43,7 @@ public CoordinatorCluster(int id, ClusterCoordinator coordinator, CopycatStateCo

@Override
protected CoordinatedMember createMember(MemberInfo info) {
AbstractMemberCoordinator memberCoordinator = new DefaultRemoteMemberCoordinator(info, coordinator.config().getClusterConfig().getProtocol(), userExecutor);
AbstractMemberCoordinator memberCoordinator = new DefaultRemoteMemberCoordinator(info, coordinator.config().getClusterConfig().getProtocol(), executor);
try {
memberCoordinator.open().get();
} catch (InterruptedException | ExecutionException e) {
Expand Down
Expand Up @@ -15,23 +15,23 @@
*/
package net.kuujo.copycat.cluster.internal.coordinator;

import net.kuujo.copycat.cluster.internal.*;
import net.kuujo.copycat.resource.Resource;
import net.kuujo.copycat.resource.internal.ResourceContext;
import net.kuujo.copycat.cluster.Cluster;
import net.kuujo.copycat.cluster.Member;
import net.kuujo.copycat.cluster.MembershipEvent;
import net.kuujo.copycat.cluster.internal.*;
import net.kuujo.copycat.cluster.internal.manager.ClusterManager;
import net.kuujo.copycat.cluster.internal.manager.MemberManager;
import net.kuujo.copycat.resource.internal.CopycatStateContext;
import net.kuujo.copycat.resource.internal.DefaultResourceContext;
import net.kuujo.copycat.util.internal.Assert;
import net.kuujo.copycat.util.concurrent.Futures;
import net.kuujo.copycat.util.concurrent.NamedThreadFactory;
import net.kuujo.copycat.log.BufferedLog;
import net.kuujo.copycat.protocol.RaftProtocol;
import net.kuujo.copycat.protocol.rpc.Request;
import net.kuujo.copycat.protocol.rpc.Response;
import net.kuujo.copycat.resource.Resource;
import net.kuujo.copycat.resource.internal.CopycatStateContext;
import net.kuujo.copycat.resource.internal.DefaultResourceContext;
import net.kuujo.copycat.resource.internal.ResourceContext;
import net.kuujo.copycat.util.concurrent.Futures;
import net.kuujo.copycat.util.concurrent.NamedThreadFactory;
import net.kuujo.copycat.util.internal.Assert;
import net.kuujo.copycat.util.serializer.KryoSerializer;
import net.kuujo.copycat.util.serializer.Serializer;

Expand Down Expand Up @@ -66,7 +66,7 @@ public DefaultClusterCoordinator(String uri, CoordinatorConfig config) {
this.members.put(uri, localMember);
for (String member : config.getClusterConfig().getMembers()) {
if (!this.members.containsKey(member)) {
this.members.put(member, new DefaultRemoteMemberCoordinator(new MemberInfo(member, Member.Type.ACTIVE, Member.State.ALIVE), config.getClusterConfig().getProtocol(), Executors.newSingleThreadExecutor(threadFactory)));
this.members.put(member, new DefaultRemoteMemberCoordinator(new MemberInfo(member, Member.Type.ACTIVE, Member.State.ALIVE), config.getClusterConfig().getProtocol(), Executors.newSingleThreadScheduledExecutor(threadFactory)));
}
}

Expand Down
Expand Up @@ -24,7 +24,8 @@
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
* Default remote member coordinator implementation.
Expand All @@ -33,9 +34,9 @@
*/
public class DefaultRemoteMemberCoordinator extends AbstractMemberCoordinator {
private final ProtocolClient client;
private final Executor executor;
private final ScheduledExecutorService executor;

public DefaultRemoteMemberCoordinator(MemberInfo info, Protocol protocol, Executor executor) {
public DefaultRemoteMemberCoordinator(MemberInfo info, Protocol protocol, ScheduledExecutorService executor) {
super(info);
try {
URI realUri = new URI(info.uri());
Expand Down Expand Up @@ -65,7 +66,32 @@ public CompletableFuture<ByteBuffer> send(String topic, int address, int id, Byt

@Override
public CompletableFuture<MemberCoordinator> open() {
return super.open().thenComposeAsync(v -> client.connect(), executor).thenApply(v -> this);
return super.open().thenComposeAsync(v -> connect(), executor).thenApply(v -> this);
}

/**
* Recursively attempts to connect to the server.
*/
private CompletableFuture<Void> connect() {
return connect(new CompletableFuture<>());
}

/**
* Recursively attempts to connect to the server.
*/
private CompletableFuture<Void> connect(CompletableFuture<Void> future) {
if (isOpen()) {
client.connect().whenComplete((result, error) -> {
if (error == null) {
future.complete(null);
} else {
executor.schedule(() -> connect(future), 100, TimeUnit.MILLISECONDS);
}
});
} else {
future.completeExceptionally(new IllegalStateException("Member closed"));
}
return future;
}

@Override
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/net/kuujo/copycat/log/LogConfig.java
Expand Up @@ -45,7 +45,7 @@ protected LogConfig(Map<String, Object> config) {
super(config);
}

protected LogConfig(Log log) {
protected LogConfig(LogConfig log) {
super(log);
}

Expand Down

0 comments on commit 8d3aec3

Please sign in to comment.