Skip to content

Commit

Permalink
Support caching AtomicDocumentTree primitive.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Aug 8, 2018
1 parent b68909b commit e44becf
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 22 deletions.
Expand Up @@ -17,7 +17,7 @@
package io.atomix.core.tree; package io.atomix.core.tree;


import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import io.atomix.primitive.PrimitiveBuilder; import io.atomix.core.cache.CachedPrimitiveBuilder;
import io.atomix.primitive.PrimitiveManagementService; import io.atomix.primitive.PrimitiveManagementService;
import io.atomix.primitive.protocol.PrimitiveProtocol; import io.atomix.primitive.protocol.PrimitiveProtocol;
import io.atomix.primitive.protocol.ProxyCompatibleBuilder; import io.atomix.primitive.protocol.ProxyCompatibleBuilder;
Expand All @@ -31,7 +31,7 @@
* Builder for {@link AtomicDocumentTree}. * Builder for {@link AtomicDocumentTree}.
*/ */
public abstract class AtomicDocumentTreeBuilder<V> public abstract class AtomicDocumentTreeBuilder<V>
extends PrimitiveBuilder<AtomicDocumentTreeBuilder<V>, AtomicDocumentTreeConfig, AtomicDocumentTree<V>> extends CachedPrimitiveBuilder<AtomicDocumentTreeBuilder<V>, AtomicDocumentTreeConfig, AtomicDocumentTree<V>>
implements ProxyCompatibleBuilder<AtomicDocumentTreeBuilder<V>> { implements ProxyCompatibleBuilder<AtomicDocumentTreeBuilder<V>> {


protected AtomicDocumentTreeBuilder(String name, AtomicDocumentTreeConfig config, PrimitiveManagementService managementService) { protected AtomicDocumentTreeBuilder(String name, AtomicDocumentTreeConfig config, PrimitiveManagementService managementService) {
Expand Down
Expand Up @@ -15,16 +15,16 @@
*/ */
package io.atomix.core.tree; package io.atomix.core.tree;


import io.atomix.core.cache.CachedPrimitiveConfig;
import io.atomix.primitive.PrimitiveType; import io.atomix.primitive.PrimitiveType;
import io.atomix.primitive.config.PrimitiveConfig;


import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;


/** /**
* Document tree configuration. * Document tree configuration.
*/ */
public class AtomicDocumentTreeConfig extends PrimitiveConfig<AtomicDocumentTreeConfig> { public class AtomicDocumentTreeConfig extends CachedPrimitiveConfig<AtomicDocumentTreeConfig> {
private Class<?> nodeType; private Class<?> nodeType;
private List<Class<?>> extraTypes = new ArrayList<>(); private List<Class<?>> extraTypes = new ArrayList<>();
private boolean registrationRequired = false; private boolean registrationRequired = false;
Expand Down
Expand Up @@ -19,6 +19,7 @@
import com.google.common.cache.CacheLoader; import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache; import com.google.common.cache.LoadingCache;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import io.atomix.core.cache.CacheConfig;
import io.atomix.core.tree.AsyncAtomicDocumentTree; import io.atomix.core.tree.AsyncAtomicDocumentTree;
import io.atomix.core.tree.AtomicDocumentTree; import io.atomix.core.tree.AtomicDocumentTree;
import io.atomix.core.tree.DocumentPath; import io.atomix.core.tree.DocumentPath;
Expand All @@ -43,33 +44,23 @@
* Caching asynchronous document tree. * Caching asynchronous document tree.
*/ */
public class CachingAsyncAtomicDocumentTree<V> extends DelegatingAsyncAtomicDocumentTree<V> implements AsyncAtomicDocumentTree<V> { public class CachingAsyncAtomicDocumentTree<V> extends DelegatingAsyncAtomicDocumentTree<V> implements AsyncAtomicDocumentTree<V> {
private static final int DEFAULT_CACHE_SIZE = 10000;
private final Logger log = getLogger(getClass()); private final Logger log = getLogger(getClass());


private final LoadingCache<DocumentPath, CompletableFuture<Versioned<V>>> cache; private final LoadingCache<DocumentPath, CompletableFuture<Versioned<V>>> cache;
private final DocumentTreeEventListener<V> cacheUpdater; private final DocumentTreeEventListener<V> cacheUpdater;
private final Consumer<PrimitiveState> stateListener; private final Consumer<PrimitiveState> stateListener;
private final Map<DocumentTreeEventListener<V>, InternalListener<V>> eventListeners = new ConcurrentHashMap<>(); private final Map<DocumentTreeEventListener<V>, InternalListener<V>> eventListeners = new ConcurrentHashMap<>();


/**
* Default constructor.
*
* @param backingTree a distributed, strongly consistent map for backing
*/
public CachingAsyncAtomicDocumentTree(AsyncAtomicDocumentTree<V> backingTree) {
this(backingTree, DEFAULT_CACHE_SIZE);
}

/** /**
* Constructor to configure cache size. * Constructor to configure cache size.
* *
* @param backingTree a distributed, strongly consistent map for backing * @param backingTree a distributed, strongly consistent map for backing
* @param cacheSize the maximum size of the cache * @param cacheConfig the cache configuration
*/ */
public CachingAsyncAtomicDocumentTree(AsyncAtomicDocumentTree<V> backingTree, int cacheSize) { public CachingAsyncAtomicDocumentTree(AsyncAtomicDocumentTree<V> backingTree, CacheConfig cacheConfig) {
super(backingTree); super(backingTree);
cache = CacheBuilder.newBuilder() cache = CacheBuilder.newBuilder()
.maximumSize(cacheSize) .maximumSize(cacheConfig.getSize())
.build(CacheLoader.from(CachingAsyncAtomicDocumentTree.super::get)); .build(CacheLoader.from(CachingAsyncAtomicDocumentTree.super::get));
cacheUpdater = event -> { cacheUpdater = event -> {
if (!event.newValue().isPresent()) { if (!event.newValue().isPresent()) {
Expand Down
Expand Up @@ -40,13 +40,17 @@ public DefaultAtomicDocumentTreeBuilder(String name, AtomicDocumentTreeConfig co
public CompletableFuture<AtomicDocumentTree<V>> buildAsync() { public CompletableFuture<AtomicDocumentTree<V>> buildAsync() {
return newProxy(DocumentTreeService.class, new ServiceConfig()) return newProxy(DocumentTreeService.class, new ServiceConfig())
.thenCompose(proxy -> new AtomicDocumentTreeProxy(proxy, managementService.getPrimitiveRegistry()).connect()) .thenCompose(proxy -> new AtomicDocumentTreeProxy(proxy, managementService.getPrimitiveRegistry()).connect())
.thenApply(tree -> { .thenApply(treeProxy -> {
Serializer serializer = serializer(); Serializer serializer = serializer();
return new TranscodingAsyncAtomicDocumentTree<V, byte[]>( AsyncAtomicDocumentTree<V> tree = new TranscodingAsyncAtomicDocumentTree<>(
tree, treeProxy,
key -> serializer.encode(key), key -> serializer.encode(key),
bytes -> serializer.decode(bytes)) bytes -> serializer.decode(bytes));
.sync();
if (config.getCacheConfig().isEnabled()) {
tree = new CachingAsyncAtomicDocumentTree<V>(tree, config.getCacheConfig());
}
return tree.sync();
}); });
} }
} }

0 comments on commit e44becf

Please sign in to comment.