Skip to content
Permalink
Browse files
Cluster builder should be provided to the factory and expose state
Patch by David Capwell, reviewed by Alex Petrov for CASSANDRA-15733.
  • Loading branch information
dcapwell authored and ifesdjeen committed Apr 16, 2020
1 parent 7ddfe52 commit 50fdfefa11248e7b93507b8e66322dc7a5056744
Showing 2 changed files with 67 additions and 65 deletions.
@@ -25,6 +25,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -36,30 +37,60 @@

import static org.apache.cassandra.distributed.api.TokenSupplier.evenlyDistributedTokens;

public abstract class Builder<I extends IInstance, C extends ICluster>
public abstract class AbstractBuilder<I extends IInstance, C extends ICluster, B extends AbstractBuilder<I, C, B>>
{

private final int BROADCAST_PORT = 7012;

public interface Factory<I extends IInstance, C extends ICluster>
public interface Factory<I extends IInstance, C extends ICluster, B extends AbstractBuilder<I, C, B>>
{
C newCluster(File root, Versions.Version version, List<IInstanceConfig> configs, ClassLoader sharedClassLoader);
C newCluster(B builder);
}

private final Factory<I, C> factory;
private final Factory<I, C, B> factory;
private int nodeCount;
private int subnet;
private Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology;
private TokenSupplier tokenSupplier;
private File root;
private Versions.Version version;
private Consumer<IInstanceConfig> configUpdater;
private ClassLoader sharedClassLoader = Thread.currentThread().getContextClassLoader();

public Builder(Factory<I, C> factory)
public AbstractBuilder(Factory<I, C, B> factory)
{
this.factory = factory;
}

public int getNodeCount() {
return nodeCount;
}

public int getSubnet() {
return subnet;
}

public Map<Integer, NetworkTopology.DcAndRack> getNodeIdTopology() {
return nodeIdTopology;
}

public TokenSupplier getTokenSupplier() {
return tokenSupplier;
}

public File getRoot() {
return root;
}

public Versions.Version getVersion() {
return version;
}

public Consumer<IInstanceConfig> getConfigUpdater() {
return configUpdater;
}

public ClassLoader getSharedClassLoader() {
return sharedClassLoader;
}

public C start() throws IOException
{
C cluster = createWithoutStarting();
@@ -75,79 +106,50 @@ public C createWithoutStarting() throws IOException
if (nodeCount <= 0)
throw new IllegalStateException("Cluster must have at least one node");

root.mkdirs();

if (nodeIdTopology == null)
{
nodeIdTopology = IntStream.rangeClosed(1, nodeCount).boxed()
.collect(Collectors.toMap(nodeId -> nodeId,
nodeId -> NetworkTopology.dcAndRack(dcName(0), rackName(0))));
}

root.mkdirs();

ClassLoader sharedClassLoader = Thread.currentThread().getContextClassLoader();

List<IInstanceConfig> configs = new ArrayList<>();

// TODO: make token allocation strategy configurable
if (tokenSupplier == null)
tokenSupplier = evenlyDistributedTokens(nodeCount);

for (int i = 0; i < nodeCount; ++i)
{
int nodeNum = i + 1;
configs.add(createInstanceConfig(nodeNum));
}

return factory.newCluster(root, version, configs, sharedClassLoader);
return factory.newCluster((B) this);
}

public IInstanceConfig newInstanceConfig(C cluster)
public B withSharedClassLoader(ClassLoader sharedClassLoader)
{
return createInstanceConfig(cluster.size() + 1);
this.sharedClassLoader = Objects.requireNonNull(sharedClassLoader, "sharedClassLoader");
return (B) this;
}

protected IInstanceConfig createInstanceConfig(int nodeNum)
{
String ipPrefix = "127.0." + subnet + ".";
String seedIp = ipPrefix + "1";
String ipAddress = ipPrefix + nodeNum;
long token = tokenSupplier.token(nodeNum);

NetworkTopology topology = NetworkTopology.build(ipPrefix, BROADCAST_PORT, nodeIdTopology);

IInstanceConfig config = generateConfig(nodeNum, ipAddress, topology, root, String.valueOf(token), seedIp);
if (configUpdater != null)
configUpdater.accept(config);

return config;
}

protected abstract IInstanceConfig generateConfig(int nodeNum, String ipAddress, NetworkTopology networkTopology, File root, String token, String seedIp);

public Builder<I, C> withTokenSupplier(TokenSupplier tokenSupplier)
public B withTokenSupplier(TokenSupplier tokenSupplier)
{
this.tokenSupplier = tokenSupplier;
return this;
return (B) this;
}

public Builder<I, C> withSubnet(int subnet)
public B withSubnet(int subnet)
{
this.subnet = subnet;
return this;
return (B) this;
}

public Builder<I, C> withNodes(int nodeCount)
public B withNodes(int nodeCount)
{
this.nodeCount = nodeCount;
return this;
return (B) this;
}

public Builder<I, C> withDCs(int dcCount)
public B withDCs(int dcCount)
{
return withRacks(dcCount, 1);
}

public Builder<I, C> withRacks(int dcCount, int racksPerDC)
public B withRacks(int dcCount, int racksPerDC)
{
if (nodeCount == 0)
throw new IllegalStateException("Node count will be calculated. Do not supply total node count in the builder");
@@ -157,7 +159,7 @@ public Builder<I, C> withRacks(int dcCount, int racksPerDC)
return withRacks(dcCount, racksPerDC, nodesPerRack);
}

public Builder<I, C> withRacks(int dcCount, int racksPerDC, int nodesPerRack)
public B withRacks(int dcCount, int racksPerDC, int nodesPerRack)
{
if (nodeIdTopology != null)
throw new IllegalStateException("Network topology already created. Call withDCs/withRacks once or before withDC/withRack calls");
@@ -181,15 +183,15 @@ public Builder<I, C> withRacks(int dcCount, int racksPerDC, int nodesPerRack)
dcCount, racksPerDC, nodesPerRack, adjustedNodeCount));
nodeCount = adjustedNodeCount;
}
return this;
return (B) this;
}

public Builder<I, C> withDC(String dcName, int nodeCount)
public B withDC(String dcName, int nodeCount)
{
return withRack(dcName, rackName(1), nodeCount);
}

public Builder<I, C> withRack(String dcName, String rackName, int nodesInRack)
public B withRack(String dcName, String rackName, int nodesInRack)
{
if (nodeIdTopology == null)
{
@@ -202,11 +204,11 @@ public Builder<I, C> withRack(String dcName, String rackName, int nodesInRack)
nodeIdTopology.put(nodeId, NetworkTopology.dcAndRack(dcName, rackName));

nodeCount += nodesInRack;
return this;
return (B) this;
}

// Map of node ids to dc and rack - must be contiguous with an entry nodeId 1 to nodeCount
public Builder<I, C> withNodeIdTopology(Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology)
public B withNodeIdTopology(Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology)
{
if (nodeIdTopology.isEmpty())
throw new IllegalStateException("Topology is empty. It must have an entry for every nodeId.");
@@ -224,25 +226,25 @@ public Builder<I, C> withNodeIdTopology(Map<Integer, NetworkTopology.DcAndRack>

this.nodeIdTopology = new HashMap<>(nodeIdTopology);

return this;
return (B) this;
}

public Builder<I, C> withRoot(File root)
public B withRoot(File root)
{
this.root = root;
return this;
return (B) this;
}

public Builder<I, C> withVersion(Versions.Version version)
public B withVersion(Versions.Version version)
{
this.version = version;
return this;
return (B) this;
}

public Builder<I, C> withConfig(Consumer<IInstanceConfig> updater)
public B withConfig(Consumer<IInstanceConfig> updater)
{
this.configUpdater = updater;
return this;
return (B) this;
}

static String dcName(int index)
@@ -34,7 +34,7 @@ public static void beforeClass() throws Throwable
ICluster.setup();
}

public abstract <I extends IInstance, C extends ICluster> Builder<I, C> builder();
public abstract <I extends IInstance, C extends ICluster, B extends AbstractBuilder<I, C, B>> AbstractBuilder<I, C, B> builder();

public static String KEYSPACE = "distributed_test_keyspace";

0 comments on commit 50fdfef

Please sign in to comment.