Skip to content

Commit

Permalink
Add primitive registry in PrimitivesService.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Apr 12, 2018
1 parent 0753636 commit 4cba8e2
Show file tree
Hide file tree
Showing 6 changed files with 194 additions and 161 deletions.
26 changes: 15 additions & 11 deletions core/src/main/java/io/atomix/core/Atomix.java
Expand Up @@ -58,6 +58,7 @@
import io.atomix.primitive.DistributedPrimitive;
import io.atomix.primitive.DistributedPrimitiveBuilder;
import io.atomix.primitive.PrimitiveConfig;
import io.atomix.primitive.PrimitiveInfo;
import io.atomix.primitive.PrimitiveType;
import io.atomix.primitive.PrimitiveTypeRegistry;
import io.atomix.primitive.partition.ManagedPartitionGroup;
Expand Down Expand Up @@ -89,14 +90,14 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.atomix.primitive.partition.PartitionService.SYSTEM_GROUP;

/**
* Atomix!
Expand Down Expand Up @@ -132,7 +133,6 @@ public static Builder builder(File configFile) {
return new Builder(loadConfig(configFile));
}

protected static final String SYSTEM_GROUP_NAME = "system";
protected static final String CORE_GROUP_NAME = "core";
protected static final String DATA_GROUP_NAME = "data";

Expand Down Expand Up @@ -289,9 +289,13 @@ public <C extends PrimitiveConfig<C>, P extends DistributedPrimitive> P getPrimi
}

@Override
@SuppressWarnings("unchecked")
public Set<String> getPrimitiveNames(PrimitiveType primitiveType) {
return context.primitives.getPrimitiveNames(primitiveType);
public Collection<PrimitiveInfo> getPrimitives() {
return context.primitives.getPrimitives();
}

@Override
public Collection<PrimitiveInfo> getPrimitives(PrimitiveType primitiveType) {
return context.primitives.getPrimitives(primitiveType);
}

/**
Expand Down Expand Up @@ -549,12 +553,12 @@ private static ManagedClusterEventingService buildClusterEventService(
*/
private static ManagedPartitionGroup buildSystemPartitionGroup(AtomixConfig config) {
if (config.getClusterConfig().getNodes().stream().anyMatch(node -> node.getType() == Node.Type.CORE)) {
return RaftPartitionGroup.builder(SYSTEM_GROUP_NAME)
return RaftPartitionGroup.builder(SYSTEM_GROUP)
.withNumPartitions(1)
.withDataDirectory(new File(config.getDataDirectory(), SYSTEM_GROUP_NAME))
.withDataDirectory(new File(config.getDataDirectory(), SYSTEM_GROUP))
.build();
} else {
return PrimaryBackupPartitionGroup.builder(SYSTEM_GROUP_NAME)
return PrimaryBackupPartitionGroup.builder(SYSTEM_GROUP)
.withNumPartitions(1)
.build();
}
Expand Down Expand Up @@ -969,12 +973,12 @@ protected ManagedClusterEventingService buildClusterEventService(
*/
protected ManagedPartitionGroup buildSystemPartitionGroup() {
if (nodes.stream().anyMatch(node -> node.type() == Node.Type.CORE)) {
return RaftPartitionGroup.builder(SYSTEM_GROUP_NAME)
return RaftPartitionGroup.builder(SYSTEM_GROUP)
.withNumPartitions(1)
.withDataDirectory(new File(dataDirectory, SYSTEM_GROUP_NAME))
.withDataDirectory(new File(dataDirectory, SYSTEM_GROUP))
.build();
} else {
return PrimaryBackupPartitionGroup.builder(SYSTEM_GROUP_NAME)
return PrimaryBackupPartitionGroup.builder(SYSTEM_GROUP)
.withNumPartitions(1)
.build();
}
Expand Down
127 changes: 9 additions & 118 deletions core/src/main/java/io/atomix/core/PrimitivesService.java
Expand Up @@ -58,10 +58,11 @@
import io.atomix.primitive.DistributedPrimitive;
import io.atomix.primitive.DistributedPrimitiveBuilder;
import io.atomix.primitive.PrimitiveConfig;
import io.atomix.primitive.PrimitiveInfo;
import io.atomix.primitive.PrimitiveProtocol;
import io.atomix.primitive.PrimitiveType;

import java.util.Set;
import java.util.Collection;

/**
* Primitives service.
Expand Down Expand Up @@ -535,128 +536,18 @@ default <B extends DistributedPrimitiveBuilder<B, C, P>, C extends PrimitiveConf
}

/**
* Returns a list of map names.
* Returns a collection of open primitives.
*
* @return a list of map names
* @return a collection of open primitives
*/
default Set<String> getConsistentMapNames() {
return getPrimitiveNames(ConsistentMapType.instance());
}

/**
* Returns a list of document tree names.
*
* @return a list of document tree names
*/
default Set<String> getDocumentTreeNames() {
return getPrimitiveNames(ConsistentTreeMapType.instance());
}

/**
* Returns a list of tree map names.
*
* @return a list of tree map names
*/
default Set<String> getConsistentTreeMapNames() {
return getPrimitiveNames(ConsistentTreeMapType.instance());
}

/**
* Returns a list of multimap names.
*
* @return a list of multimap names
*/
default Set<String> getConsistentMultimapNames() {
return getPrimitiveNames(ConsistentMultimapType.instance());
}

/**
* Returns a list of counter map names.
*
* @return a list of counter map names
*/
default Set<String> getAtomicCounterMapNames() {
return getPrimitiveNames(AtomicCounterMapType.instance());
}

/**
* Returns a list of set names.
*
* @return a list of set names
*/
default Set<String> getSetNames() {
return getPrimitiveNames(DistributedSetType.instance());
}

/**
* Returns a list of counter names.
*
* @return a list of counter names
*/
default Set<String> getAtomicCounterNames() {
return getPrimitiveNames(AtomicCounterType.instance());
}
Collection<PrimitiveInfo> getPrimitives();

/**
* Returns a list of ID generator names.
* Returns a colleciton of open primitives of the given type.
*
* @return a list of ID generator names
*/
default Set<String> getAtomicIdGeneratorNames() {
return getPrimitiveNames(AtomicIdGeneratorType.instance());
}

/**
* Returns a list of atomic value names.
*
* @return a list of atomic value names
*/
default Set<String> getAtomicValueNames() {
return getPrimitiveNames(AtomicValueType.instance());
}

/**
* Returns a list of leader election names.
*
* @return a list of leader election names
*/
default Set<String> getLeaderElectionNames() {
return getPrimitiveNames(LeaderElectionType.instance());
}

/**
* Returns a list of leader elector names.
*
* @return a list of leader elector names
*/
default Set<String> getLeaderElectorNames() {
return getPrimitiveNames(LeaderElectorType.instance());
}

/**
* Returns a list of lock names.
*
* @return a list of lock names
*/
default Set<String> getDistributedLockNames() {
return getPrimitiveNames(DistributedLockType.instance());
}

/**
* Returns a list of work queue names.
*
* @return a list of work queue names
*/
default Set<String> getWorkQueueNames() {
return getPrimitiveNames(WorkQueueType.instance());
}

/**
* Returns a set of primitive names for the given primitive type.
*
* @param primitiveType the primitive type for which to return names
* @return a set of names of the given primitive type
* @param primitiveType the primitive type
* @return a collection of open primitives of the given type
*/
Set<String> getPrimitiveNames(PrimitiveType primitiveType);
Collection<PrimitiveInfo> getPrimitives(PrimitiveType primitiveType);

}
75 changes: 57 additions & 18 deletions core/src/main/java/io/atomix/core/impl/CorePrimitivesService.java
Expand Up @@ -17,8 +17,6 @@

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import io.atomix.cluster.ClusterService;
import io.atomix.cluster.messaging.ClusterEventingService;
import io.atomix.cluster.messaging.ClusterMessagingService;
Expand All @@ -35,12 +33,15 @@
import io.atomix.core.generator.AtomicIdGeneratorType;
import io.atomix.core.lock.DistributedLock;
import io.atomix.core.lock.DistributedLockType;
import io.atomix.core.map.AsyncConsistentMap;
import io.atomix.core.map.AtomicCounterMap;
import io.atomix.core.map.AtomicCounterMapType;
import io.atomix.core.map.ConsistentMap;
import io.atomix.core.map.ConsistentMapType;
import io.atomix.core.map.ConsistentTreeMap;
import io.atomix.core.map.ConsistentTreeMapType;
import io.atomix.core.map.impl.ConsistentMapProxy;
import io.atomix.core.map.impl.TranscodingAsyncConsistentMap;
import io.atomix.core.multimap.ConsistentMultimap;
import io.atomix.core.multimap.ConsistentMultimapType;
import io.atomix.core.queue.WorkQueue;
Expand All @@ -58,16 +59,22 @@
import io.atomix.primitive.DistributedPrimitive;
import io.atomix.primitive.DistributedPrimitiveBuilder;
import io.atomix.primitive.PrimitiveConfig;
import io.atomix.primitive.PrimitiveException;
import io.atomix.primitive.PrimitiveInfo;
import io.atomix.primitive.PrimitiveManagementService;
import io.atomix.primitive.PrimitiveType;
import io.atomix.primitive.partition.PartitionService;
import io.atomix.utils.AtomixRuntimeException;
import io.atomix.utils.concurrent.Futures;
import io.atomix.utils.serializer.KryoNamespace;
import io.atomix.utils.serializer.KryoNamespaces;
import io.atomix.utils.serializer.Serializer;
import io.atomix.utils.time.Versioned;

import java.util.List;
import java.util.Set;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

Expand All @@ -78,13 +85,18 @@
*/
public class CorePrimitivesService implements ManagedPrimitivesService {
private static final int CACHE_SIZE = 1000;
private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.builder()
.register(KryoNamespaces.BASIC)
.register(PrimitiveInfo.class)
.build());

private final PrimitiveManagementService managementService;
private final ManagedTransactionService transactionService;
private final AtomixConfig config;
private final Cache<String, DistributedPrimitive> cache = CacheBuilder.newBuilder()
.maximumSize(CACHE_SIZE)
.build();
private AsyncConsistentMap<String, PrimitiveInfo> primitives;
private final AtomicBoolean started = new AtomicBoolean();

public CorePrimitivesService(
Expand Down Expand Up @@ -195,24 +207,51 @@ public <C extends PrimitiveConfig<C>, P extends DistributedPrimitive> P getPrimi
}

@Override
@SuppressWarnings("unchecked")
public Set<String> getPrimitiveNames(PrimitiveType primitiveType) {
return managementService.getPartitionService().getPartitionGroups().stream()
.map(group -> ((List<Set<String>>) Futures.allOf((List) group.getPartitions().stream()
.map(partition -> partition.getPrimitiveClient().getPrimitives(primitiveType))
.collect(Collectors.toList()))
.join())
.stream()
.reduce(Sets::union)
.orElse(ImmutableSet.of()))
.reduce(Sets::union)
.orElse(ImmutableSet.of());
public Collection<PrimitiveInfo> getPrimitives() {
try {
return primitives.values()
.get(DistributedPrimitive.DEFAULT_OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)
.stream()
.map(Versioned::valueOrNull)
.collect(Collectors.toList());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PrimitiveException.Interrupted();
} catch (TimeoutException e) {
throw new PrimitiveException.Timeout();
} catch (ExecutionException e) {
throw new PrimitiveException(e.getCause());
}
}

@Override
public Collection<PrimitiveInfo> getPrimitives(PrimitiveType primitiveType) {
return getPrimitives()
.stream()
.filter(primitive -> primitive.type().equals(primitiveType))
.collect(Collectors.toList());
}

@Override
public CompletableFuture<PrimitivesService> start() {
return transactionService.start()
.thenRun(() -> started.set(true))
.thenCompose(v -> managementService.getPartitionService()
.getSystemPartitionGroup()
.getPartitions()
.iterator()
.next()
.getPrimitiveClient()
.newProxy("primitives", ConsistentMapType.instance())
.connect())
.thenAccept(proxy -> {
this.primitives = new TranscodingAsyncConsistentMap<String, PrimitiveInfo, String, byte[]>(
new ConsistentMapProxy(proxy),
key -> key,
key -> key,
value -> value != null ? SERIALIZER.encode(value) : null,
value -> value != null ? SERIALIZER.decode(value) : null);
started.set(true);
})
.thenApply(v -> this);
}

Expand Down

0 comments on commit 4cba8e2

Please sign in to comment.