diff --git a/raft/src/main/java/net/kuujo/copycat/raft/RaftConfig.java b/raft/src/main/java/net/kuujo/copycat/raft/RaftConfig.java index 90f4ffe2fa..65e30b1465 100644 --- a/raft/src/main/java/net/kuujo/copycat/raft/RaftConfig.java +++ b/raft/src/main/java/net/kuujo/copycat/raft/RaftConfig.java @@ -31,15 +31,21 @@ * @author Jordan Halterman */ public class RaftConfig extends AbstractConfigurable { - private static final String RESOURCE_ELECTION_TIMEOUT = "election.timeout"; - private static final String RESOURCE_HEARTBEAT_INTERVAL = "heartbeat.interval"; - private static final String RESOURCE_REPLICAS = "replicas"; + private static final String RAFT_ID = "id"; + private static final String RAFT_NAME = "name"; + private static final String RAFT_ELECTION_TIMEOUT = "election.timeout"; + private static final String RAFT_HEARTBEAT_INTERVAL = "heartbeat.interval"; + private static final String RAFT_REPLICAS = "replicas"; private static final String RESOURCE_LOG = "log"; public RaftConfig() { super(); } + public RaftConfig(String resource) { + super(resource); + } + public RaftConfig(Map config) { super(config); } @@ -53,6 +59,81 @@ public RaftConfig copy() { return new RaftConfig(this); } + /** + * Sets the unique identifier of this instance of the Raft algorithm.

+ * + * If the given identifier is {@code null} then a {@link java.util.UUID} based unique identifier will be automatically + * generated. + * + * @param id The unique identifier of this instance of the Raft algorithm. + */ + public void setId(String id) { + if (id != null) { + this.config = config.withValue(RAFT_ID, ConfigValueFactory.fromAnyRef(id)); + } else if (!this.config.hasPath(RAFT_ID)) { + this.config = config.withValue(RAFT_ID, ConfigValueFactory.fromAnyRef(UUID.randomUUID().toString())); + } + } + + /** + * Returns the unique identifier of this instance of the Raft algorithm.

+ * + * If the unique ID for this instance has not yet been set, a {@link java.util.UUID} based identifier will be generated. + * + * @return The unique identifier of this instance of the Raft algorithm. + */ + public String getId() { + if (!config.hasPath(RAFT_ID)) { + setId(null); + } + return config.getString(RAFT_ID); + } + + /** + * Sets the unique identifier of this instance of the Raft algorithm, returning the configuration for method chaining.

+ * + * If the given identifier is {@code null} then a {@link java.util.UUID} based unique identifier will be automatically + * generated. + * + * @param id The unique identifier of this instance of the Raft algorithm. + * @return The Raft configuration. + */ + public RaftConfig withId(String id) { + setId(id); + return this; + } + + /** + * Sets the Raft algorithm name. + * + * @param name The algorithm name. + * @throws java.lang.NullPointerException If the name is {@code null} + */ + public void setName(String name) { + this.config = config.withValue(RAFT_NAME, ConfigValueFactory.fromAnyRef(Assert.isNotNull(name, "name"))); + } + + /** + * Returns the Raft algorithm name. + * + * @return The Raft algorithm name. + * @throws java.lang.NullPointerException If the algorithm name has not been configured. + */ + public String getName() { + return config.getString(RAFT_NAME); + } + + /** + * Sets the Raft algorithm name, returning the configuration for method chaining. + * + * @param name The Raft algorithm name. + * @return The Raft configuration. + */ + public RaftConfig withName(String name) { + setName(name); + return this; + } + /** * Sets the resource election timeout. * @@ -60,7 +141,7 @@ public RaftConfig copy() { * @throws java.lang.IllegalArgumentException If the election timeout is not positive */ public void setElectionTimeout(long electionTimeout) { - this.config = config.withValue(RESOURCE_ELECTION_TIMEOUT, ConfigValueFactory.fromAnyRef(Assert.arg(electionTimeout, electionTimeout > 0, "election timeout must be positive"))); + this.config = config.withValue(RAFT_ELECTION_TIMEOUT, ConfigValueFactory.fromAnyRef(Assert.arg(electionTimeout, electionTimeout > 0, "election timeout must be positive"))); } /** @@ -80,7 +161,7 @@ public void setElectionTimeout(long electionTimeout, TimeUnit unit) { * @return The resource election timeout in milliseconds. */ public long getElectionTimeout() { - return config.getLong(RESOURCE_ELECTION_TIMEOUT); + return config.getLong(RAFT_ELECTION_TIMEOUT); } /** @@ -115,7 +196,7 @@ public RaftConfig withElectionTimeout(long electionTimeout, TimeUnit unit) { * @throws java.lang.IllegalArgumentException If the heartbeat interval is not positive */ public void setHeartbeatInterval(long heartbeatInterval) { - this.config = config.withValue(RESOURCE_HEARTBEAT_INTERVAL, ConfigValueFactory.fromAnyRef(Assert.arg(heartbeatInterval, heartbeatInterval > 0, "heartbeat interval must be positive"))); + this.config = config.withValue(RAFT_HEARTBEAT_INTERVAL, ConfigValueFactory.fromAnyRef(Assert.arg(heartbeatInterval, heartbeatInterval > 0, "heartbeat interval must be positive"))); } /** @@ -135,7 +216,7 @@ public void setHeartbeatInterval(long heartbeatInterval, TimeUnit unit) { * @return The interval at which nodes send heartbeats to each other. */ public long getHeartbeatInterval() { - return config.getLong(RESOURCE_HEARTBEAT_INTERVAL); + return config.getLong(RAFT_HEARTBEAT_INTERVAL); } /** @@ -180,7 +261,7 @@ public void setReplicas(String... replicas) { * @throws java.lang.NullPointerException If {@code replicas} is {@code null} */ public void setReplicas(Collection replicas) { - this.config = config.withValue(RESOURCE_REPLICAS, ConfigValueFactory.fromIterable(new HashSet<>(Assert.isNotNull(replicas, "replicas")))); + this.config = config.withValue(RAFT_REPLICAS, ConfigValueFactory.fromIterable(new HashSet<>(Assert.isNotNull(replicas, "replicas")))); } /** @@ -190,7 +271,7 @@ public void setReplicas(Collection replicas) { */ @SuppressWarnings({ "rawtypes", "unchecked" }) public Set getReplicas() { - return new HashSet(config.hasPath(RESOURCE_REPLICAS) ? (List) config.getList(RESOURCE_REPLICAS).unwrapped() : new ArrayList<>(0)); + return new HashSet(config.hasPath(RAFT_REPLICAS) ? (List) config.getList(RAFT_REPLICAS).unwrapped() : new ArrayList<>(0)); } /** @@ -225,10 +306,10 @@ public RaftConfig withReplicas(Collection replicas) { * @throws java.lang.NullPointerException If {@code replica} is {@code null} */ public RaftConfig addReplica(String replica) { - if (!config.hasPath(RESOURCE_REPLICAS)) { - this.config = config.withValue(RESOURCE_REPLICAS, ConfigValueFactory.fromIterable(new ArrayList(1))); + if (!config.hasPath(RAFT_REPLICAS)) { + this.config = config.withValue(RAFT_REPLICAS, ConfigValueFactory.fromIterable(new ArrayList(1))); } - ConfigList replicas = config.getList(RESOURCE_REPLICAS); + ConfigList replicas = config.getList(RAFT_REPLICAS); replicas.add(ConfigValueFactory.fromAnyRef(Assert.isNotNull(replica, "replica"))); return this; } @@ -241,7 +322,7 @@ public RaftConfig addReplica(String replica) { * @throws java.lang.NullPointerException If {@code replica} is {@code null} */ public RaftConfig removeReplica(String replica) { - ConfigList replicas = config.getList(RESOURCE_REPLICAS); + ConfigList replicas = config.getList(RAFT_REPLICAS); replicas.remove(ConfigValueFactory.fromAnyRef(Assert.isNotNull(replica, "replica"))); return this; } @@ -252,7 +333,7 @@ public RaftConfig removeReplica(String replica) { * @return The resource configuration. */ public RaftConfig clearReplicas() { - config.withoutPath(RESOURCE_REPLICAS); + config.withoutPath(RAFT_REPLICAS); return this; } diff --git a/raft/src/main/java/net/kuujo/copycat/raft/RaftContext.java b/raft/src/main/java/net/kuujo/copycat/raft/RaftContext.java index 09fcd35bf5..3d3c860542 100644 --- a/raft/src/main/java/net/kuujo/copycat/raft/RaftContext.java +++ b/raft/src/main/java/net/kuujo/copycat/raft/RaftContext.java @@ -17,6 +17,7 @@ import net.kuujo.copycat.log.LogManager; import net.kuujo.copycat.raft.protocol.*; +import net.kuujo.copycat.util.concurrent.NamedThreadFactory; import net.kuujo.copycat.util.internal.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,6 +26,7 @@ import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; /** @@ -65,15 +67,23 @@ public class RaftContext extends Observable implements RaftProtocol { private long heartbeatInterval = 250; private volatile boolean open; - public RaftContext(String name, String uri, RaftConfig config, ScheduledExecutorService executor) { + public RaftContext(String resource) { + this(new RaftConfig(resource)); + } + + public RaftContext(RaftConfig config) { + this(config, Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(config.getName()))); + } + + public RaftContext(RaftConfig config, ScheduledExecutorService executor) { this.executor = executor; this.config = config; - this.localMember = new RaftMember(Assert.isNotNull(uri, "uri"), config.getReplicas().contains(uri) ? RaftMember.Type.PROMOTABLE : RaftMember.Type.PASSIVE, RaftMember.Status.ALIVE); + this.localMember = new RaftMember(config.getId(), config.getReplicas().contains(config.getId()) ? RaftMember.Type.PROMOTABLE : RaftMember.Type.PASSIVE, RaftMember.Status.ALIVE); members.put(localMember.uri(), localMember); config.getReplicas().forEach(r -> { members.put(r, new RaftMember(r, RaftMember.Type.ACTIVE, RaftMember.Status.ALIVE)); }); - this.log = config.getLog().getLogManager(name); + this.log = config.getLog().getLogManager(config.getName()); this.electionTimeout = config.getElectionTimeout(); this.heartbeatInterval = config.getHeartbeatInterval(); try {