Skip to content

Commit

Permalink
Refactor Raft configuration to simplify construction of RaftContext i…
Browse files Browse the repository at this point in the history
…nstances.
  • Loading branch information
kuujo committed Feb 15, 2015
1 parent c65f671 commit b2c0d27
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 17 deletions.
109 changes: 95 additions & 14 deletions raft/src/main/java/net/kuujo/copycat/raft/RaftConfig.java
Expand Up @@ -31,15 +31,21 @@
* @author <a href="http://github.com/kuujo">Jordan Halterman</a> * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/ */
public class RaftConfig extends AbstractConfigurable { public class RaftConfig extends AbstractConfigurable {
private static final String RESOURCE_ELECTION_TIMEOUT = "election.timeout"; private static final String RAFT_ID = "id";
private static final String RESOURCE_HEARTBEAT_INTERVAL = "heartbeat.interval"; private static final String RAFT_NAME = "name";
private static final String RESOURCE_REPLICAS = "replicas"; private static final String RAFT_ELECTION_TIMEOUT = "election.timeout";
private static final String RAFT_HEARTBEAT_INTERVAL = "heartbeat.interval";
private static final String RAFT_REPLICAS = "replicas";
private static final String RESOURCE_LOG = "log"; private static final String RESOURCE_LOG = "log";


public RaftConfig() { public RaftConfig() {
super(); super();
} }


public RaftConfig(String resource) {
super(resource);
}

public RaftConfig(Map<String, Object> config) { public RaftConfig(Map<String, Object> config) {
super(config); super(config);
} }
Expand All @@ -53,14 +59,89 @@ public RaftConfig copy() {
return new RaftConfig(this); return new RaftConfig(this);
} }


/**
* Sets the unique identifier of this instance of the Raft algorithm.<p>
*
* If the given identifier is {@code null} then a {@link java.util.UUID} based unique identifier will be automatically
* generated.
*
* @param id The unique identifier of this instance of the Raft algorithm.
*/
public void setId(String id) {
if (id != null) {
this.config = config.withValue(RAFT_ID, ConfigValueFactory.fromAnyRef(id));
} else if (!this.config.hasPath(RAFT_ID)) {
this.config = config.withValue(RAFT_ID, ConfigValueFactory.fromAnyRef(UUID.randomUUID().toString()));
}
}

/**
* Returns the unique identifier of this instance of the Raft algorithm.<p>
*
* If the unique ID for this instance has not yet been set, a {@link java.util.UUID} based identifier will be generated.
*
* @return The unique identifier of this instance of the Raft algorithm.
*/
public String getId() {
if (!config.hasPath(RAFT_ID)) {
setId(null);
}
return config.getString(RAFT_ID);
}

/**
* Sets the unique identifier of this instance of the Raft algorithm, returning the configuration for method chaining.<p>
*
* If the given identifier is {@code null} then a {@link java.util.UUID} based unique identifier will be automatically
* generated.
*
* @param id The unique identifier of this instance of the Raft algorithm.
* @return The Raft configuration.
*/
public RaftConfig withId(String id) {
setId(id);
return this;
}

/**
* Sets the Raft algorithm name.
*
* @param name The algorithm name.
* @throws java.lang.NullPointerException If the name is {@code null}
*/
public void setName(String name) {
this.config = config.withValue(RAFT_NAME, ConfigValueFactory.fromAnyRef(Assert.isNotNull(name, "name")));
}

/**
* Returns the Raft algorithm name.
*
* @return The Raft algorithm name.
* @throws java.lang.NullPointerException If the algorithm name has not been configured.
*/
public String getName() {
return config.getString(RAFT_NAME);
}

/**
* Sets the Raft algorithm name, returning the configuration for method chaining.
*
* @param name The Raft algorithm name.
* @return The Raft configuration.
*/
public RaftConfig withName(String name) {
setName(name);
return this;
}

/** /**
* Sets the resource election timeout. * Sets the resource election timeout.
* *
* @param electionTimeout The resource election timeout in milliseconds. * @param electionTimeout The resource election timeout in milliseconds.
* @throws java.lang.IllegalArgumentException If the election timeout is not positive * @throws java.lang.IllegalArgumentException If the election timeout is not positive
*/ */
public void setElectionTimeout(long electionTimeout) { public void setElectionTimeout(long electionTimeout) {
this.config = config.withValue(RESOURCE_ELECTION_TIMEOUT, ConfigValueFactory.fromAnyRef(Assert.arg(electionTimeout, electionTimeout > 0, "election timeout must be positive"))); this.config = config.withValue(RAFT_ELECTION_TIMEOUT, ConfigValueFactory.fromAnyRef(Assert.arg(electionTimeout, electionTimeout > 0, "election timeout must be positive")));
} }


/** /**
Expand All @@ -80,7 +161,7 @@ public void setElectionTimeout(long electionTimeout, TimeUnit unit) {
* @return The resource election timeout in milliseconds. * @return The resource election timeout in milliseconds.
*/ */
public long getElectionTimeout() { public long getElectionTimeout() {
return config.getLong(RESOURCE_ELECTION_TIMEOUT); return config.getLong(RAFT_ELECTION_TIMEOUT);
} }


/** /**
Expand Down Expand Up @@ -115,7 +196,7 @@ public RaftConfig withElectionTimeout(long electionTimeout, TimeUnit unit) {
* @throws java.lang.IllegalArgumentException If the heartbeat interval is not positive * @throws java.lang.IllegalArgumentException If the heartbeat interval is not positive
*/ */
public void setHeartbeatInterval(long heartbeatInterval) { public void setHeartbeatInterval(long heartbeatInterval) {
this.config = config.withValue(RESOURCE_HEARTBEAT_INTERVAL, ConfigValueFactory.fromAnyRef(Assert.arg(heartbeatInterval, heartbeatInterval > 0, "heartbeat interval must be positive"))); this.config = config.withValue(RAFT_HEARTBEAT_INTERVAL, ConfigValueFactory.fromAnyRef(Assert.arg(heartbeatInterval, heartbeatInterval > 0, "heartbeat interval must be positive")));
} }


/** /**
Expand All @@ -135,7 +216,7 @@ public void setHeartbeatInterval(long heartbeatInterval, TimeUnit unit) {
* @return The interval at which nodes send heartbeats to each other. * @return The interval at which nodes send heartbeats to each other.
*/ */
public long getHeartbeatInterval() { public long getHeartbeatInterval() {
return config.getLong(RESOURCE_HEARTBEAT_INTERVAL); return config.getLong(RAFT_HEARTBEAT_INTERVAL);
} }


/** /**
Expand Down Expand Up @@ -180,7 +261,7 @@ public void setReplicas(String... replicas) {
* @throws java.lang.NullPointerException If {@code replicas} is {@code null} * @throws java.lang.NullPointerException If {@code replicas} is {@code null}
*/ */
public void setReplicas(Collection<String> replicas) { public void setReplicas(Collection<String> replicas) {
this.config = config.withValue(RESOURCE_REPLICAS, ConfigValueFactory.fromIterable(new HashSet<>(Assert.isNotNull(replicas, "replicas")))); this.config = config.withValue(RAFT_REPLICAS, ConfigValueFactory.fromIterable(new HashSet<>(Assert.isNotNull(replicas, "replicas"))));
} }


/** /**
Expand All @@ -190,7 +271,7 @@ public void setReplicas(Collection<String> replicas) {
*/ */
@SuppressWarnings({ "rawtypes", "unchecked" }) @SuppressWarnings({ "rawtypes", "unchecked" })
public Set<String> getReplicas() { public Set<String> getReplicas() {
return new HashSet<String>(config.hasPath(RESOURCE_REPLICAS) ? (List) config.getList(RESOURCE_REPLICAS).unwrapped() : new ArrayList<>(0)); return new HashSet<String>(config.hasPath(RAFT_REPLICAS) ? (List) config.getList(RAFT_REPLICAS).unwrapped() : new ArrayList<>(0));
} }


/** /**
Expand Down Expand Up @@ -225,10 +306,10 @@ public RaftConfig withReplicas(Collection<String> replicas) {
* @throws java.lang.NullPointerException If {@code replica} is {@code null} * @throws java.lang.NullPointerException If {@code replica} is {@code null}
*/ */
public RaftConfig addReplica(String replica) { public RaftConfig addReplica(String replica) {
if (!config.hasPath(RESOURCE_REPLICAS)) { if (!config.hasPath(RAFT_REPLICAS)) {
this.config = config.withValue(RESOURCE_REPLICAS, ConfigValueFactory.fromIterable(new ArrayList<String>(1))); this.config = config.withValue(RAFT_REPLICAS, ConfigValueFactory.fromIterable(new ArrayList<String>(1)));
} }
ConfigList replicas = config.getList(RESOURCE_REPLICAS); ConfigList replicas = config.getList(RAFT_REPLICAS);
replicas.add(ConfigValueFactory.fromAnyRef(Assert.isNotNull(replica, "replica"))); replicas.add(ConfigValueFactory.fromAnyRef(Assert.isNotNull(replica, "replica")));
return this; return this;
} }
Expand All @@ -241,7 +322,7 @@ public RaftConfig addReplica(String replica) {
* @throws java.lang.NullPointerException If {@code replica} is {@code null} * @throws java.lang.NullPointerException If {@code replica} is {@code null}
*/ */
public RaftConfig removeReplica(String replica) { public RaftConfig removeReplica(String replica) {
ConfigList replicas = config.getList(RESOURCE_REPLICAS); ConfigList replicas = config.getList(RAFT_REPLICAS);
replicas.remove(ConfigValueFactory.fromAnyRef(Assert.isNotNull(replica, "replica"))); replicas.remove(ConfigValueFactory.fromAnyRef(Assert.isNotNull(replica, "replica")));
return this; return this;
} }
Expand All @@ -252,7 +333,7 @@ public RaftConfig removeReplica(String replica) {
* @return The resource configuration. * @return The resource configuration.
*/ */
public RaftConfig clearReplicas() { public RaftConfig clearReplicas() {
config.withoutPath(RESOURCE_REPLICAS); config.withoutPath(RAFT_REPLICAS);
return this; return this;
} }


Expand Down
16 changes: 13 additions & 3 deletions raft/src/main/java/net/kuujo/copycat/raft/RaftContext.java
Expand Up @@ -17,6 +17,7 @@


import net.kuujo.copycat.log.LogManager; import net.kuujo.copycat.log.LogManager;
import net.kuujo.copycat.raft.protocol.*; import net.kuujo.copycat.raft.protocol.*;
import net.kuujo.copycat.util.concurrent.NamedThreadFactory;
import net.kuujo.copycat.util.internal.Assert; import net.kuujo.copycat.util.internal.Assert;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
Expand All @@ -25,6 +26,7 @@
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;


/** /**
Expand Down Expand Up @@ -65,15 +67,23 @@ public class RaftContext extends Observable implements RaftProtocol {
private long heartbeatInterval = 250; private long heartbeatInterval = 250;
private volatile boolean open; private volatile boolean open;


public RaftContext(String name, String uri, RaftConfig config, ScheduledExecutorService executor) { public RaftContext(String resource) {
this(new RaftConfig(resource));
}

public RaftContext(RaftConfig config) {
this(config, Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(config.getName())));
}

public RaftContext(RaftConfig config, ScheduledExecutorService executor) {
this.executor = executor; this.executor = executor;
this.config = config; this.config = config;
this.localMember = new RaftMember(Assert.isNotNull(uri, "uri"), config.getReplicas().contains(uri) ? RaftMember.Type.PROMOTABLE : RaftMember.Type.PASSIVE, RaftMember.Status.ALIVE); this.localMember = new RaftMember(config.getId(), config.getReplicas().contains(config.getId()) ? RaftMember.Type.PROMOTABLE : RaftMember.Type.PASSIVE, RaftMember.Status.ALIVE);
members.put(localMember.uri(), localMember); members.put(localMember.uri(), localMember);
config.getReplicas().forEach(r -> { config.getReplicas().forEach(r -> {
members.put(r, new RaftMember(r, RaftMember.Type.ACTIVE, RaftMember.Status.ALIVE)); members.put(r, new RaftMember(r, RaftMember.Type.ACTIVE, RaftMember.Status.ALIVE));
}); });
this.log = config.getLog().getLogManager(name); this.log = config.getLog().getLogManager(config.getName());
this.electionTimeout = config.getElectionTimeout(); this.electionTimeout = config.getElectionTimeout();
this.heartbeatInterval = config.getHeartbeatInterval(); this.heartbeatInterval = config.getHeartbeatInterval();
try { try {
Expand Down

0 comments on commit b2c0d27

Please sign in to comment.