Skip to content

Commit

Permalink
Add support for configured primitives loaded via Atomix API.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Apr 6, 2018
1 parent a352599 commit 75a038d
Show file tree
Hide file tree
Showing 37 changed files with 643 additions and 86 deletions.
96 changes: 91 additions & 5 deletions core/src/main/java/io/atomix/core/Atomix.java
Expand Up @@ -32,9 +32,22 @@
import io.atomix.cluster.messaging.ManagedClusterMessagingService; import io.atomix.cluster.messaging.ManagedClusterMessagingService;
import io.atomix.cluster.messaging.impl.DefaultClusterEventingService; import io.atomix.cluster.messaging.impl.DefaultClusterEventingService;
import io.atomix.cluster.messaging.impl.DefaultClusterMessagingService; import io.atomix.cluster.messaging.impl.DefaultClusterMessagingService;
import io.atomix.core.counter.AtomicCounter;
import io.atomix.core.election.LeaderElection;
import io.atomix.core.election.LeaderElector;
import io.atomix.core.generator.AtomicIdGenerator;
import io.atomix.core.generator.impl.IdGeneratorSessionIdService; import io.atomix.core.generator.impl.IdGeneratorSessionIdService;
import io.atomix.core.impl.CorePrimitivesService; import io.atomix.core.impl.CorePrimitivesService;
import io.atomix.core.lock.DistributedLock;
import io.atomix.core.map.AtomicCounterMap;
import io.atomix.core.map.ConsistentMap;
import io.atomix.core.map.ConsistentTreeMap;
import io.atomix.core.multimap.ConsistentMultimap;
import io.atomix.core.queue.WorkQueue;
import io.atomix.core.set.DistributedSet;
import io.atomix.core.transaction.TransactionBuilder; import io.atomix.core.transaction.TransactionBuilder;
import io.atomix.core.tree.DocumentTree;
import io.atomix.core.value.AtomicValue;
import io.atomix.messaging.BroadcastService; import io.atomix.messaging.BroadcastService;
import io.atomix.messaging.ManagedBroadcastService; import io.atomix.messaging.ManagedBroadcastService;
import io.atomix.messaging.ManagedMessagingService; import io.atomix.messaging.ManagedMessagingService;
Expand All @@ -44,6 +57,7 @@
import io.atomix.primitive.DistributedPrimitive; import io.atomix.primitive.DistributedPrimitive;
import io.atomix.primitive.DistributedPrimitiveBuilder; import io.atomix.primitive.DistributedPrimitiveBuilder;
import io.atomix.primitive.PrimitiveConfig; import io.atomix.primitive.PrimitiveConfig;
import io.atomix.primitive.PrimitiveConfigs;
import io.atomix.primitive.PrimitiveType; import io.atomix.primitive.PrimitiveType;
import io.atomix.primitive.PrimitiveTypeRegistry; import io.atomix.primitive.PrimitiveTypeRegistry;
import io.atomix.primitive.partition.ManagedPartitionGroup; import io.atomix.primitive.partition.ManagedPartitionGroup;
Expand Down Expand Up @@ -168,11 +182,82 @@ public TransactionBuilder transactionBuilder(String name) {
} }


@Override @Override
public <B extends DistributedPrimitiveBuilder<B, C, P>, C extends PrimitiveConfig, P extends DistributedPrimitive> B primitiveBuilder( public <B extends DistributedPrimitiveBuilder<B, C, P>, C extends PrimitiveConfig<C>, P extends DistributedPrimitive> B primitiveBuilder(
String name, PrimitiveType<B, C, P> primitiveType) { String name,
PrimitiveType<B, C, P> primitiveType) {
return context.primitives.primitiveBuilder(name, primitiveType); return context.primitives.primitiveBuilder(name, primitiveType);
} }


@Override
public <K, V> ConsistentMap<K, V> getConsistentMap(String name) {
return context.primitives.getConsistentMap(name);
}

@Override
public <V> DocumentTree<V> getDocumentTree(String name) {
return context.primitives.getDocumentTree(name);
}

@Override
public <V> ConsistentTreeMap<V> getTreeMap(String name) {
return context.primitives.getTreeMap(name);
}

@Override
public <K, V> ConsistentMultimap<K, V> getConsistentMultimap(String name) {
return context.primitives.getConsistentMultimap(name);
}

@Override
public <K> AtomicCounterMap<K> getAtomicCounterMap(String name) {
return context.primitives.getAtomicCounterMap(name);
}

@Override
public <E> DistributedSet<E> getSet(String name) {
return context.primitives.getSet(name);
}

@Override
public AtomicCounter getAtomicCounter(String name) {
return context.primitives.getAtomicCounter(name);
}

@Override
public AtomicIdGenerator getAtomicIdGenerator(String name) {
return context.primitives.getAtomicIdGenerator(name);
}

@Override
public <V> AtomicValue<V> getAtomicValue(String name) {
return context.primitives.getAtomicValue(name);
}

@Override
public <T> LeaderElection<T> getLeaderElection(String name) {
return context.primitives.getLeaderElection(name);
}

@Override
public <T> LeaderElector<T> getLeaderElector(String name) {
return context.primitives.getLeaderElector(name);
}

@Override
public DistributedLock getLock(String name) {
return context.primitives.getLock(name);
}

@Override
public <E> WorkQueue<E> getWorkQueue(String name) {
return context.primitives.getWorkQueue(name);
}

@Override
public <C extends PrimitiveConfig<C>, P extends DistributedPrimitive> P getPrimitive(String name, PrimitiveType<?, C, P> primitiveType, C primitiveConfig) {
return context.primitives.getPrimitive(name, primitiveType, primitiveConfig);
}

@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public Set<String> getPrimitiveNames(PrimitiveType primitiveType) { public Set<String> getPrimitiveNames(PrimitiveType primitiveType) {
Expand All @@ -189,6 +274,7 @@ public Set<String> getPrimitiveNames(PrimitiveType primitiveType) {
* @return a future to be completed once the instance has completed startup * @return a future to be completed once the instance has completed startup
*/ */
@Override @Override
@SuppressWarnings("unchecked")
public synchronized CompletableFuture<Atomix> start() { public synchronized CompletableFuture<Atomix> start() {
if (closeFuture != null) { if (closeFuture != null) {
return Futures.exceptionalFuture(new IllegalStateException("Atomix instance " + return Futures.exceptionalFuture(new IllegalStateException("Atomix instance " +
Expand Down Expand Up @@ -308,8 +394,8 @@ private static Context buildContext(AtomixConfig config) {
ManagedClusterEventingService clusterEventingService = buildClusterEventService(clusterService, messagingService); ManagedClusterEventingService clusterEventingService = buildClusterEventService(clusterService, messagingService);
ManagedPartitionGroup systemPartitionGroup = buildSystemPartitionGroup(config); ManagedPartitionGroup systemPartitionGroup = buildSystemPartitionGroup(config);
ManagedPartitionService partitions = buildPartitionService(config); ManagedPartitionService partitions = buildPartitionService(config);
ManagedPrimitivesService primitives = new CorePrimitivesService(clusterService, clusterMessagingService, clusterEventingService, partitions); ManagedPrimitivesService primitives = new CorePrimitivesService(clusterService, clusterMessagingService, clusterEventingService, partitions, config.getPrimitives());
PrimitiveTypeRegistry primitiveTypes = new PrimitiveTypeRegistry(config.getPrimitives()); PrimitiveTypeRegistry primitiveTypes = new PrimitiveTypeRegistry(config.getPrimitiveTypes());
return new Context( return new Context(
messagingService, messagingService,
broadcastService, broadcastService,
Expand Down Expand Up @@ -755,7 +841,7 @@ public Atomix build() {
ManagedClusterEventingService clusterEventingService = buildClusterEventService(clusterService, messagingService); ManagedClusterEventingService clusterEventingService = buildClusterEventService(clusterService, messagingService);
ManagedPartitionGroup systemPartitionGroup = buildSystemPartitionGroup(); ManagedPartitionGroup systemPartitionGroup = buildSystemPartitionGroup();
ManagedPartitionService partitions = buildPartitionService(); ManagedPartitionService partitions = buildPartitionService();
ManagedPrimitivesService primitives = new CorePrimitivesService(clusterService, clusterMessagingService, clusterEventingService, partitions); ManagedPrimitivesService primitives = new CorePrimitivesService(clusterService, clusterMessagingService, clusterEventingService, partitions, new PrimitiveConfigs());
return new Atomix(new Context( return new Atomix(new Context(
messagingService, messagingService,
broadcastService, broadcastService,
Expand Down
34 changes: 28 additions & 6 deletions core/src/main/java/io/atomix/core/AtomixConfig.java
Expand Up @@ -16,7 +16,8 @@
package io.atomix.core; package io.atomix.core;


import io.atomix.cluster.ClusterConfig; import io.atomix.cluster.ClusterConfig;
import io.atomix.primitive.PrimitiveTypeConfig; import io.atomix.primitive.PrimitiveConfigs;
import io.atomix.primitive.PrimitiveTypeConfigs;
import io.atomix.primitive.partition.PartitionGroupConfig; import io.atomix.primitive.partition.PartitionGroupConfig;
import io.atomix.utils.Config; import io.atomix.utils.Config;


Expand All @@ -32,7 +33,8 @@ public class AtomixConfig implements Config {
private File dataDirectory = new File(System.getProperty("user.dir"), "data"); private File dataDirectory = new File(System.getProperty("user.dir"), "data");
private boolean enableShutdownHook; private boolean enableShutdownHook;
private Collection<PartitionGroupConfig> partitionGroups = new ArrayList<>(); private Collection<PartitionGroupConfig> partitionGroups = new ArrayList<>();
private PrimitiveTypeConfig primitives = new PrimitiveTypeConfig(); private PrimitiveTypeConfigs primitiveTypes = new PrimitiveTypeConfigs();
private PrimitiveConfigs primitives = new PrimitiveConfigs();


/** /**
* Returns the cluster configuration. * Returns the cluster configuration.
Expand Down Expand Up @@ -130,17 +132,37 @@ public AtomixConfig addPartitionGroup(PartitionGroupConfig partitionGroup) {
* *
* @return the primitive type configuration * @return the primitive type configuration
*/ */
public PrimitiveTypeConfig getPrimitives() { public PrimitiveTypeConfigs getPrimitiveTypes() {
return primitives; return primitiveTypes;
} }


/** /**
* Sets the primitive type configuration. * Sets the primitive type configuration.
* *
* @param primitives the primitive type configuration * @param primitiveTypes the primitive type configuration
* @return the Atomix configuration
*/
public AtomixConfig setPrimitiveTypes(PrimitiveTypeConfigs primitiveTypes) {
this.primitiveTypes = primitiveTypes;
return this;
}

/**
* Returns the primitive configurations.
*
* @return the primitive configurations
*/
public PrimitiveConfigs getPrimitives() {
return primitives;
}

/**
* Sets the primitive configurations.
*
* @param primitives the primitive configurations
* @return the Atomix configuration * @return the Atomix configuration
*/ */
public AtomixConfig setPrimitives(PrimitiveTypeConfig primitives) { public AtomixConfig setPrimitives(PrimitiveConfigs primitives) {
this.primitives = primitives; this.primitives = primitives;
return this; return this;
} }
Expand Down

0 comments on commit 75a038d

Please sign in to comment.