diff --git a/src/main/java/org/apache/cassandra/distributed/shared/Builder.java b/src/main/java/org/apache/cassandra/distributed/shared/AbstractBuilder.java similarity index 70% rename from src/main/java/org/apache/cassandra/distributed/shared/Builder.java rename to src/main/java/org/apache/cassandra/distributed/shared/AbstractBuilder.java index b3b7db0..cc341a3 100644 --- a/src/main/java/org/apache/cassandra/distributed/shared/Builder.java +++ b/src/main/java/org/apache/cassandra/distributed/shared/AbstractBuilder.java @@ -18,35 +18,31 @@ package org.apache.cassandra.distributed.shared; +import org.apache.cassandra.distributed.api.ICluster; +import org.apache.cassandra.distributed.api.IInstance; +import org.apache.cassandra.distributed.api.IInstanceConfig; +import org.apache.cassandra.distributed.api.TokenSupplier; + import java.io.File; import java.io.IOException; import java.nio.file.Files; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.IntStream; -import org.apache.cassandra.distributed.api.ICluster; -import org.apache.cassandra.distributed.api.IInstance; -import org.apache.cassandra.distributed.api.IInstanceConfig; -import org.apache.cassandra.distributed.api.TokenSupplier; - import static org.apache.cassandra.distributed.api.TokenSupplier.evenlyDistributedTokens; -public abstract class Builder +public abstract class AbstractBuilder> { - - private final int BROADCAST_PORT = 7012; - - public interface Factory + public interface Factory> { - C newCluster(File root, Versions.Version version, List configs, ClassLoader sharedClassLoader); + C newCluster(B builder); } - private final Factory factory; + private final Factory factory; private int nodeCount; private int subnet; private Map nodeIdTopology; @@ -54,12 +50,50 @@ public interface Factory private File root; private Versions.Version version; private Consumer configUpdater; + private ClassLoader sharedClassLoader = Thread.currentThread().getContextClassLoader(); + private int broadcastPort = 7012; - public Builder(Factory factory) + public AbstractBuilder(Factory factory) { this.factory = factory; } + public int getNodeCount() { + return nodeCount; + } + + public int getSubnet() { + return subnet; + } + + public Map getNodeIdTopology() { + return nodeIdTopology; + } + + public TokenSupplier getTokenSupplier() { + return tokenSupplier; + } + + public File getRoot() { + return root; + } + + public Versions.Version getVersion() { + return version; + } + + public Consumer getConfigUpdater() { + return configUpdater; + } + + public ClassLoader getSharedClassLoader() { + return sharedClassLoader; + } + + public int getBroadcastPort() { + return broadcastPort; + } + public C start() throws IOException { C cluster = createWithoutStarting(); @@ -75,79 +109,55 @@ public C createWithoutStarting() throws IOException if (nodeCount <= 0) throw new IllegalStateException("Cluster must have at least one node"); + root.mkdirs(); + if (nodeIdTopology == null) - { nodeIdTopology = IntStream.rangeClosed(1, nodeCount).boxed() .collect(Collectors.toMap(nodeId -> nodeId, nodeId -> NetworkTopology.dcAndRack(dcName(0), rackName(0)))); - } - - root.mkdirs(); - - ClassLoader sharedClassLoader = Thread.currentThread().getContextClassLoader(); - - List configs = new ArrayList<>(); // TODO: make token allocation strategy configurable if (tokenSupplier == null) tokenSupplier = evenlyDistributedTokens(nodeCount); - for (int i = 0; i < nodeCount; ++i) - { - int nodeNum = i + 1; - configs.add(createInstanceConfig(nodeNum)); - } - - return factory.newCluster(root, version, configs, sharedClassLoader); + return factory.newCluster((B) this); } - public IInstanceConfig newInstanceConfig(C cluster) + public B withSharedClassLoader(ClassLoader sharedClassLoader) { - return createInstanceConfig(cluster.size() + 1); + this.sharedClassLoader = Objects.requireNonNull(sharedClassLoader, "sharedClassLoader"); + return (B) this; } - protected IInstanceConfig createInstanceConfig(int nodeNum) - { - String ipPrefix = "127.0." + subnet + "."; - String seedIp = ipPrefix + "1"; - String ipAddress = ipPrefix + nodeNum; - long token = tokenSupplier.token(nodeNum); - - NetworkTopology topology = NetworkTopology.build(ipPrefix, BROADCAST_PORT, nodeIdTopology); - - IInstanceConfig config = generateConfig(nodeNum, ipAddress, topology, root, String.valueOf(token), seedIp); - if (configUpdater != null) - configUpdater.accept(config); - - return config; + public B withBroadcastPort(int broadcastPort) { + this.broadcastPort = broadcastPort; + return (B) this; } - protected abstract IInstanceConfig generateConfig(int nodeNum, String ipAddress, NetworkTopology networkTopology, File root, String token, String seedIp); - - public Builder withTokenSupplier(TokenSupplier tokenSupplier) + public B withTokenSupplier(TokenSupplier tokenSupplier) { this.tokenSupplier = tokenSupplier; - return this; + return (B) this; } - public Builder withSubnet(int subnet) + public B withSubnet(int subnet) { this.subnet = subnet; - return this; + return (B) this; } - public Builder withNodes(int nodeCount) + public B withNodes(int nodeCount) { this.nodeCount = nodeCount; - return this; + return (B) this; } - public Builder withDCs(int dcCount) + public B withDCs(int dcCount) { return withRacks(dcCount, 1); } - public Builder withRacks(int dcCount, int racksPerDC) + public B withRacks(int dcCount, int racksPerDC) { if (nodeCount == 0) throw new IllegalStateException("Node count will be calculated. Do not supply total node count in the builder"); @@ -157,7 +167,7 @@ public Builder withRacks(int dcCount, int racksPerDC) return withRacks(dcCount, racksPerDC, nodesPerRack); } - public Builder withRacks(int dcCount, int racksPerDC, int nodesPerRack) + public B withRacks(int dcCount, int racksPerDC, int nodesPerRack) { if (nodeIdTopology != null) throw new IllegalStateException("Network topology already created. Call withDCs/withRacks once or before withDC/withRack calls"); @@ -181,15 +191,15 @@ public Builder withRacks(int dcCount, int racksPerDC, int nodesPerRack) dcCount, racksPerDC, nodesPerRack, adjustedNodeCount)); nodeCount = adjustedNodeCount; } - return this; + return (B) this; } - public Builder withDC(String dcName, int nodeCount) + public B withDC(String dcName, int nodeCount) { return withRack(dcName, rackName(1), nodeCount); } - public Builder withRack(String dcName, String rackName, int nodesInRack) + public B withRack(String dcName, String rackName, int nodesInRack) { if (nodeIdTopology == null) { @@ -202,11 +212,11 @@ public Builder withRack(String dcName, String rackName, int nodesInRack) nodeIdTopology.put(nodeId, NetworkTopology.dcAndRack(dcName, rackName)); nodeCount += nodesInRack; - return this; + return (B) this; } // Map of node ids to dc and rack - must be contiguous with an entry nodeId 1 to nodeCount - public Builder withNodeIdTopology(Map nodeIdTopology) + public B withNodeIdTopology(Map nodeIdTopology) { if (nodeIdTopology.isEmpty()) throw new IllegalStateException("Topology is empty. It must have an entry for every nodeId."); @@ -224,25 +234,25 @@ public Builder withNodeIdTopology(Map this.nodeIdTopology = new HashMap<>(nodeIdTopology); - return this; + return (B) this; } - public Builder withRoot(File root) + public B withRoot(File root) { this.root = root; - return this; + return (B) this; } - public Builder withVersion(Versions.Version version) + public B withVersion(Versions.Version version) { this.version = version; - return this; + return (B) this; } - public Builder withConfig(Consumer updater) + public B withConfig(Consumer updater) { this.configUpdater = updater; - return this; + return (B) this; } static String dcName(int index) diff --git a/src/main/java/org/apache/cassandra/distributed/shared/DistributedTestBase.java b/src/main/java/org/apache/cassandra/distributed/shared/DistributedTestBase.java index 3ec5426..adcc306 100644 --- a/src/main/java/org/apache/cassandra/distributed/shared/DistributedTestBase.java +++ b/src/main/java/org/apache/cassandra/distributed/shared/DistributedTestBase.java @@ -34,7 +34,7 @@ public static void beforeClass() throws Throwable ICluster.setup(); } - public abstract Builder builder(); + public abstract > AbstractBuilder builder(); public static String KEYSPACE = "distributed_test_keyspace";