Skip to content

Commit

Permalink
Refactor cluster configuration to allow local member URIs to be confi…
Browse files Browse the repository at this point in the history
…gured in file configurations.
  • Loading branch information
kuujo committed Feb 4, 2015
1 parent e58fe12 commit 812adfc
Show file tree
Hide file tree
Showing 41 changed files with 285 additions and 218 deletions.
109 changes: 66 additions & 43 deletions README.md

Large diffs are not rendered by default.

15 changes: 6 additions & 9 deletions api/src/main/java/net/kuujo/copycat/Copycat.java
Expand Up @@ -39,33 +39,30 @@ public interface Copycat extends Managed<Copycat> {
/**
* Creates a new Copycat instance with the default Copycat and cluster configuration.
*
* @param uri The local member URI.
* @return The Copycat instance.
*/
static Copycat create(String uri) {
return create(uri, new CopycatConfig().withClusterConfig(new ClusterConfig()));
static Copycat create() {
return create(new CopycatConfig().withClusterConfig(new ClusterConfig()));
}

/**
* Creates a new Copycat instance, overriding the default cluster configuration.
*
* @param uri The local member URI.
* @param cluster The global cluster configuration.
* @return The Copycat instance.
*/
static Copycat create(String uri, ClusterConfig cluster) {
return create(uri, new CopycatConfig().withClusterConfig(cluster));
static Copycat create(ClusterConfig cluster) {
return create(new CopycatConfig().withClusterConfig(cluster));
}

/**
* Creates a new Copycat instance.
*
* @param uri The local member URI.
* @param config The global Copycat configuration.
* @return The Copycat instance.
*/
static Copycat create(String uri, CopycatConfig config) {
return new DefaultCopycat(uri, config);
static Copycat create(CopycatConfig config) {
return new DefaultCopycat(config);
}

/**
Expand Down
16 changes: 15 additions & 1 deletion api/src/main/java/net/kuujo/copycat/CopycatConfig.java
Expand Up @@ -37,7 +37,6 @@
public class CopycatConfig extends AbstractConfigurable {
public static final String COPYCAT_NAME = "name";
public static final String COPYCAT_DEFAULT_SERIALIZER = "serializer";
public static final String COPYCAT_DEFAULT_EXECUTOR = "executor";
public static final String COPYCAT_CLUSTER = "cluster";

private static final String DEFAULT_CONFIGURATION = "copycat-default";
Expand Down Expand Up @@ -253,4 +252,19 @@ public CoordinatorConfig resolve() {
.withClusterConfig(getClusterConfig());
}

@Override
public String toString() {
return String.format("%s[%s]", getClass().getSimpleName(), config.root().unwrapped());
}

@Override
public boolean equals(Object object) {
return object instanceof CopycatConfig && ((CopycatConfig) object).config.equals(config);
}

@Override
public int hashCode() {
return 17 * config.root().unwrapped().hashCode();
}

}
Expand Up @@ -41,8 +41,8 @@ public class DefaultCopycat implements Copycat {
private final ClusterCoordinator coordinator;
private final CopycatConfig config;

public DefaultCopycat(String uri, CopycatConfig config) {
this.coordinator = new DefaultClusterCoordinator(uri, config.resolve());
public DefaultCopycat(CopycatConfig config) {
this.coordinator = new DefaultClusterCoordinator(config.resolve());
this.config = config;
}

Expand Down
Expand Up @@ -42,11 +42,10 @@ public interface AsyncAtomicBoolean extends AsyncAtomicBooleanProxy, Resource<As
* configurations will be loaded according to namespaces as well; for example, `booleans.conf`.
*
* @param name The asynchronous atomic boolean name.
* @param uri The asynchronous atomic boolean member URI.
* @return The asynchronous atomic boolean.
*/
static AsyncAtomicBoolean create(String name, String uri) {
return create(name, uri, new ClusterConfig(), new AsyncAtomicBooleanConfig());
static AsyncAtomicBoolean create(String name) {
return create(name, new ClusterConfig(String.format("%s-cluster", name)), new AsyncAtomicBooleanConfig(name));
}

/**
Expand All @@ -59,30 +58,27 @@ static AsyncAtomicBoolean create(String name, String uri) {
* configurations will be loaded according to namespaces as well; for example, `booleans.conf`.
*
* @param name The asynchronous atomic boolean name.
* @param uri The asynchronous atomic boolean member URI.
* @param cluster The cluster configuration.
* @return The asynchronous atomic boolean.
*/
static AsyncAtomicBoolean create(String name, String uri, ClusterConfig cluster) {
return create(name, uri, cluster, new AsyncAtomicBooleanConfig());
static AsyncAtomicBoolean create(String name, ClusterConfig cluster) {
return create(name, cluster, new AsyncAtomicBooleanConfig(name));
}

/**
* Creates a new asynchronous atomic boolean.
*
* @param name The asynchronous atomic boolean name.
* @param uri The asynchronous atomic boolean member URI.
* @param cluster The cluster configuration.
* @param config The atomic boolean configuration.
* @return The asynchronous atomic boolean.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
static AsyncAtomicBoolean create(String name, String uri, ClusterConfig cluster, AsyncAtomicBooleanConfig config) {
ClusterCoordinator coordinator = new DefaultClusterCoordinator(uri, new CoordinatorConfig().withName(name).withClusterConfig(cluster));
AsyncAtomicBoolean reference = coordinator.getResource(name, config.resolve(cluster));
((Resource) reference).addStartupTask(() -> coordinator.open().thenApply(v -> null));
((Resource) reference).addShutdownTask(coordinator::close);
return reference;
static AsyncAtomicBoolean create(String name, ClusterConfig cluster, AsyncAtomicBooleanConfig config) {
ClusterCoordinator coordinator = new DefaultClusterCoordinator(new CoordinatorConfig().withName(name).withClusterConfig(cluster));
return coordinator.<AsyncAtomicBoolean>getResource(name, config.resolve(cluster))
.addStartupTask(() -> coordinator.open().thenApply(v -> null))
.addShutdownTask(coordinator::close);
}

}
Expand Up @@ -20,6 +20,7 @@
import net.kuujo.copycat.cluster.internal.coordinator.CoordinatedResourceConfig;
import net.kuujo.copycat.collections.AsyncCollectionConfig;
import net.kuujo.copycat.state.StateLogConfig;
import net.kuujo.copycat.util.internal.Assert;

import java.util.Map;

Expand Down Expand Up @@ -55,6 +56,7 @@ public AsyncAtomicBooleanConfig copy() {

@Override
public CoordinatedResourceConfig resolve(ClusterConfig cluster) {
Assert.config(getReplicas(), getReplicas().isEmpty() || cluster.getMembers().containsAll(getReplicas()), "Resource replica set must contain only active cluster members");
return new StateLogConfig(toMap())
.resolve(cluster)
.withResourceType(DefaultAsyncAtomicBoolean.class);
Expand Down
Expand Up @@ -42,11 +42,10 @@ public interface AsyncAtomicLong extends AsyncAtomicLongProxy, Resource<AsyncAto
* configurations will be loaded according to namespaces as well; for example, `longs.conf`.
*
* @param name The asynchronous atomic long name.
* @param uri The asynchronous atomic long member URI.
* @return The asynchronous atomic long.
*/
static AsyncAtomicLong create(String name, String uri) {
return create(name, uri, new ClusterConfig(), new AsyncAtomicLongConfig());
static AsyncAtomicLong create(String name) {
return create(name, new ClusterConfig(String.format("%s-cluster", name)), new AsyncAtomicLongConfig(name));
}

/**
Expand All @@ -59,30 +58,27 @@ static AsyncAtomicLong create(String name, String uri) {
* configurations will be loaded according to namespaces as well; for example, `longs.conf`.
*
* @param name The asynchronous atomic long name.
* @param uri The asynchronous atomic long member URI.
* @param cluster The cluster configuration.
* @return The asynchronous atomic long.
*/
static AsyncAtomicLong create(String name, String uri, ClusterConfig cluster) {
return create(name, uri, cluster, new AsyncAtomicLongConfig());
static AsyncAtomicLong create(String name, ClusterConfig cluster) {
return create(name, cluster, new AsyncAtomicLongConfig(name));
}

/**
* Creates a new asynchronous atomic long.
*
* @param name The asynchronous atomic long name.
* @param uri The asynchronous atomic long member URI.
* @param cluster The cluster configuration.
* @param config The atomic long configuration.
* @return The asynchronous atomic long.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
static AsyncAtomicLong create(String name, String uri, ClusterConfig cluster, AsyncAtomicLongConfig config) {
ClusterCoordinator coordinator = new DefaultClusterCoordinator(uri, new CoordinatorConfig().withName(name).withClusterConfig(cluster));
AsyncAtomicLong reference = coordinator.getResource(name, config.resolve(cluster));
((Resource) reference).addStartupTask(() -> coordinator.open().thenApply(v -> null));
((Resource) reference).addShutdownTask(coordinator::close);
return reference;
static AsyncAtomicLong create(String name, ClusterConfig cluster, AsyncAtomicLongConfig config) {
ClusterCoordinator coordinator = new DefaultClusterCoordinator(new CoordinatorConfig().withName(name).withClusterConfig(cluster));
return coordinator.<AsyncAtomicLong>getResource(name, config.resolve(cluster))
.addStartupTask(() -> coordinator.open().thenApply(v -> null))
.addShutdownTask(coordinator::close);
}

}
Expand Up @@ -20,6 +20,7 @@
import net.kuujo.copycat.cluster.internal.coordinator.CoordinatedResourceConfig;
import net.kuujo.copycat.collections.AsyncCollectionConfig;
import net.kuujo.copycat.state.StateLogConfig;
import net.kuujo.copycat.util.internal.Assert;

import java.util.Map;

Expand Down Expand Up @@ -55,6 +56,7 @@ public AsyncAtomicLongConfig copy() {

@Override
public CoordinatedResourceConfig resolve(ClusterConfig cluster) {
Assert.config(getReplicas(), getReplicas().isEmpty() || cluster.getMembers().containsAll(getReplicas()), "Resource replica set must contain only active cluster members");
return new StateLogConfig(toMap())
.resolve(cluster)
.withResourceType(DefaultAsyncAtomicLong.class);
Expand Down
Expand Up @@ -42,12 +42,11 @@ public interface AsyncAtomicReference<T> extends AsyncAtomicReferenceProxy<T>, R
* configurations will be loaded according to namespaces as well; for example, `references.conf`.
*
* @param name The asynchronous atomic reference name.
* @param uri The asynchronous atomic reference member URI.
* @param <T> The atomic reference data type.
* @return The asynchronous atomic reference.
*/
static <T> AsyncAtomicReference<T> create(String name, String uri) {
return create(name, uri, new ClusterConfig(), new AsyncAtomicReferenceConfig());
static <T> AsyncAtomicReference<T> create(String name) {
return create(name, new ClusterConfig(String.format("%s-cluster", name)), new AsyncAtomicReferenceConfig(name));
}

/**
Expand All @@ -60,32 +59,29 @@ static <T> AsyncAtomicReference<T> create(String name, String uri) {
* configurations will be loaded according to namespaces as well; for example, `references.conf`.
*
* @param name The asynchronous atomic reference name.
* @param uri The asynchronous atomic reference member URI.
* @param cluster The cluster configuration.
* @param <T> The atomic reference data type.
* @return The asynchronous atomic reference.
*/
static <T> AsyncAtomicReference<T> create(String name, String uri, ClusterConfig cluster) {
return create(name, uri, cluster, new AsyncAtomicReferenceConfig());
static <T> AsyncAtomicReference<T> create(String name, ClusterConfig cluster) {
return create(name, cluster, new AsyncAtomicReferenceConfig(name));
}

/**
* Creates a new asynchronous atomic reference.
*
* @param name The asynchronous atomic reference name.
* @param uri The asynchronous atomic reference member URI.
* @param cluster The cluster configuration.
* @param config The atomic reference configuration.
* @param <T> The atomic reference data type.
* @return The asynchronous atomic reference.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
static <T> AsyncAtomicReference<T> create(String name, String uri, ClusterConfig cluster, AsyncAtomicReferenceConfig config) {
ClusterCoordinator coordinator = new DefaultClusterCoordinator(uri, new CoordinatorConfig().withName(name).withClusterConfig(cluster));
AsyncAtomicReference<T> reference = coordinator.getResource(name, config.resolve(cluster));
((Resource) reference).addStartupTask(() -> coordinator.open().thenApply(v -> null));
((Resource) reference).addShutdownTask(coordinator::close);
return reference;
static <T> AsyncAtomicReference<T> create(String name, ClusterConfig cluster, AsyncAtomicReferenceConfig config) {
ClusterCoordinator coordinator = new DefaultClusterCoordinator(new CoordinatorConfig().withName(name).withClusterConfig(cluster));
return coordinator.<AsyncAtomicReference<T>>getResource(name, config.resolve(cluster))
.addStartupTask(() -> coordinator.open().thenApply(v -> null))
.addShutdownTask(coordinator::close);
}

}
Expand Up @@ -20,6 +20,7 @@
import net.kuujo.copycat.cluster.internal.coordinator.CoordinatedResourceConfig;
import net.kuujo.copycat.collections.AsyncCollectionConfig;
import net.kuujo.copycat.state.StateLogConfig;
import net.kuujo.copycat.util.internal.Assert;

import java.util.Map;

Expand Down Expand Up @@ -55,6 +56,7 @@ public AsyncAtomicReferenceConfig copy() {

@Override
public CoordinatedResourceConfig resolve(ClusterConfig cluster) {
Assert.config(getReplicas(), getReplicas().isEmpty() || cluster.getMembers().containsAll(getReplicas()), "Resource replica set must contain only active cluster members");
return new StateLogConfig(toMap())
.resolve(cluster)
.withResourceType(DefaultAsyncAtomicReference.class);
Expand Down
Expand Up @@ -19,7 +19,6 @@
import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator;
import net.kuujo.copycat.cluster.internal.coordinator.CoordinatorConfig;
import net.kuujo.copycat.cluster.internal.coordinator.DefaultClusterCoordinator;
import net.kuujo.copycat.resource.internal.AbstractResource;

/**
* Asynchronous list.
Expand All @@ -44,12 +43,11 @@ public interface AsyncList<T> extends AsyncCollection<AsyncList<T>, T>, AsyncLis
* configurations will be loaded according to namespaces as well; for example, `lists.conf`.
*
* @param name The asynchronous list name.
* @param uri The asynchronous list member URI.
* @param <T> The list data type.
* @return The asynchronous list.
*/
static <T> AsyncList<T> create(String name, String uri) {
return create(name, uri, new ClusterConfig(), new AsyncListConfig());
static <T> AsyncList<T> create(String name) {
return create(name, new ClusterConfig(String.format("%s-cluster", name)), new AsyncListConfig(name));
}

/**
Expand All @@ -62,32 +60,29 @@ static <T> AsyncList<T> create(String name, String uri) {
* configurations will be loaded according to namespaces as well; for example, `lists.conf`.
*
* @param name The asynchronous list name.
* @param uri The asynchronous list member URI.
* @param cluster The cluster configuration.
* @param <T> The list data type.
* @return The asynchronous list.
*/
static <T> AsyncList<T> create(String name, String uri, ClusterConfig cluster) {
return create(name, uri, cluster, new AsyncListConfig());
static <T> AsyncList<T> create(String name, ClusterConfig cluster) {
return create(name, cluster, new AsyncListConfig(name));
}

/**
* Creates a new asynchronous list.
*
* @param name The asynchronous list name.
* @param uri The asynchronous list member URI.
* @param cluster The cluster configuration.
* @param config The list configuration.
* @param <T> The list data type.
* @return The asynchronous list.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
static <T> AsyncList<T> create(String name, String uri, ClusterConfig cluster, AsyncListConfig config) {
ClusterCoordinator coordinator = new DefaultClusterCoordinator(uri, new CoordinatorConfig().withName(name).withClusterConfig(cluster));
AsyncList<T> list = coordinator.getResource(name, config.resolve(cluster));
((AbstractResource) list).addStartupTask(() -> coordinator.open().thenApply(v -> null));
((AbstractResource) list).addShutdownTask(coordinator::close);
return list;
static <T> AsyncList<T> create(String name, ClusterConfig cluster, AsyncListConfig config) {
ClusterCoordinator coordinator = new DefaultClusterCoordinator(new CoordinatorConfig().withName(name).withClusterConfig(cluster));
return coordinator.<AsyncList<T>>getResource(name, config.resolve(cluster))
.addStartupTask(() -> coordinator.open().thenApply(v -> null))
.addShutdownTask(coordinator::close);
}

}
Expand Up @@ -19,6 +19,7 @@
import net.kuujo.copycat.cluster.internal.coordinator.CoordinatedResourceConfig;
import net.kuujo.copycat.collections.internal.collection.DefaultAsyncList;
import net.kuujo.copycat.state.StateLogConfig;
import net.kuujo.copycat.util.internal.Assert;

import java.util.Map;

Expand Down Expand Up @@ -54,6 +55,7 @@ public AsyncListConfig copy() {

@Override
public CoordinatedResourceConfig resolve(ClusterConfig cluster) {
Assert.config(getReplicas(), getReplicas().isEmpty() || cluster.getMembers().containsAll(getReplicas()), "Resource replica set must contain only active cluster members");
return new StateLogConfig(toMap())
.resolve(cluster)
.withResourceType(DefaultAsyncList.class);
Expand Down

0 comments on commit 812adfc

Please sign in to comment.