Skip to content

Commit

Permalink
Add distributed tree set implementation.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Jul 11, 2018
1 parent fd9cdc4 commit e07bd88
Show file tree
Hide file tree
Showing 58 changed files with 2,524 additions and 470 deletions.
6 changes: 6 additions & 0 deletions core/src/main/java/io/atomix/core/Atomix.java
Expand Up @@ -41,6 +41,7 @@
import io.atomix.core.queue.DistributedQueue;
import io.atomix.core.semaphore.DistributedSemaphore;
import io.atomix.core.set.DistributedSet;
import io.atomix.core.set.DistributedTreeSet;
import io.atomix.core.transaction.TransactionBuilder;
import io.atomix.core.transaction.TransactionService;
import io.atomix.core.tree.AtomicDocumentTree;
Expand Down Expand Up @@ -387,6 +388,11 @@ public <E> DistributedSet<E> getSet(String name) {
return primitives.getSet(name);
}

@Override
public <E extends Comparable<E>> DistributedTreeSet<E> getTreeSet(String name) {
return primitives.getTreeSet(name);
}

@Override
public <E> DistributedQueue<E> getQueue(String name) {
return primitives.getQueue(name);
Expand Down
41 changes: 38 additions & 3 deletions core/src/main/java/io/atomix/core/PrimitivesService.java
Expand Up @@ -21,9 +21,6 @@
import io.atomix.core.counter.AtomicCounter;
import io.atomix.core.counter.AtomicCounterBuilder;
import io.atomix.core.counter.AtomicCounterType;
import io.atomix.core.map.AtomicCounterMap;
import io.atomix.core.map.AtomicCounterMapBuilder;
import io.atomix.core.map.AtomicCounterMapType;
import io.atomix.core.idgenerator.AtomicIdGenerator;
import io.atomix.core.idgenerator.AtomicIdGeneratorBuilder;
import io.atomix.core.idgenerator.AtomicIdGeneratorType;
Expand All @@ -39,6 +36,9 @@
import io.atomix.core.lock.DistributedLock;
import io.atomix.core.lock.DistributedLockBuilder;
import io.atomix.core.lock.DistributedLockType;
import io.atomix.core.map.AtomicCounterMap;
import io.atomix.core.map.AtomicCounterMapBuilder;
import io.atomix.core.map.AtomicCounterMapType;
import io.atomix.core.map.AtomicMap;
import io.atomix.core.map.AtomicMapBuilder;
import io.atomix.core.map.AtomicMapType;
Expand Down Expand Up @@ -69,6 +69,9 @@
import io.atomix.core.set.DistributedSet;
import io.atomix.core.set.DistributedSetBuilder;
import io.atomix.core.set.DistributedSetType;
import io.atomix.core.set.DistributedTreeSet;
import io.atomix.core.set.DistributedTreeSetBuilder;
import io.atomix.core.set.DistributedTreeSetType;
import io.atomix.core.transaction.TransactionBuilder;
import io.atomix.core.tree.AtomicDocumentTree;
import io.atomix.core.tree.AtomicDocumentTreeBuilder;
Expand Down Expand Up @@ -312,6 +315,29 @@ default <E> DistributedSetBuilder<E> setBuilder(String name, PrimitiveProtocol p
return primitiveBuilder(name, DistributedSetType.instance(), protocol);
}

/**
* Creates a new DistributedTreeSetBuilder.
*
* @param name the primitive name
* @param <E> set element type
* @return builder for an distributed set
*/
default <E extends Comparable<E>> DistributedTreeSetBuilder<E> treeSetBuilder(String name) {
return primitiveBuilder(name, DistributedTreeSetType.instance());
}

/**
* Creates a new DistributedTreeSetBuilder.
*
* @param name the primitive name
* @param protocol the primitive protocol
* @param <E> set element type
* @return builder for an distributed set
*/
default <E extends Comparable<E>> DistributedTreeSetBuilder<E> treeSetBuilder(String name, PrimitiveProtocol protocol) {
return primitiveBuilder(name, DistributedTreeSetType.instance(), protocol);
}

/**
* Creates a new DistributedQueueBuilder.
*
Expand Down Expand Up @@ -678,6 +704,15 @@ default TransactionBuilder transactionBuilder() {
*/
<E> DistributedSet<E> getSet(String name);

/**
* Creates a new DistributedTreeSet.
*
* @param name the primitive name
* @param <E> set element type
* @return a multiton instance of a distributed tree set
*/
<E extends Comparable<E>> DistributedTreeSet<E> getTreeSet(String name);

/**
* Creates a new DistributedQueue.
*
Expand Down
Expand Up @@ -40,15 +40,15 @@
/**
* Default distributed collection service.
*/
public abstract class DefaultDistributedCollectionService<T extends Collection<String>>
public abstract class DefaultDistributedCollectionService<T extends Collection<E>, E>
extends AbstractPrimitiveService<DistributedCollectionClient>
implements DistributedCollectionService {
implements DistributedCollectionService<E> {

private static final int MAX_ITERATOR_BATCH_SIZE = 1024 * 32;
protected static final int MAX_ITERATOR_BATCH_SIZE = 1000;

private final Serializer serializer;
private T collection;
private Map<Long, IteratorContext> iterators = Maps.newHashMap();
protected Map<Long, AbstractIteratorContext> iterators = Maps.newHashMap();
private Set<SessionId> listeners = Sets.newHashSet();

protected DefaultDistributedCollectionService(PrimitiveType primitiveType, T collection) {
Expand Down Expand Up @@ -85,11 +85,11 @@ public void restore(BackupInput input) {
collection = input.readObject();
}

protected void added(String element) {
protected void added(E element) {
listeners.forEach(l -> getSession(l).accept(client -> client.onEvent(new CollectionEvent<>(CollectionEvent.Type.ADD, element))));
}

protected void removed(String element) {
protected void removed(E element) {
listeners.forEach(l -> getSession(l).accept(client -> client.onEvent(new CollectionEvent<>(CollectionEvent.Type.REMOVE, element))));
}

Expand All @@ -109,7 +109,7 @@ public boolean contains(Object o) {
}

@Override
public CollectionUpdateResult<Boolean> add(String element) {
public CollectionUpdateResult<Boolean> add(E element) {
if (collection.add(element)) {
added(element);
return ok(true);
Expand All @@ -118,9 +118,9 @@ public CollectionUpdateResult<Boolean> add(String element) {
}

@Override
public CollectionUpdateResult<Boolean> remove(Object element) {
public CollectionUpdateResult<Boolean> remove(E element) {
if (collection.remove(element)) {
removed((String) element);
removed(element);
return ok(true);
}
return noop(false);
Expand All @@ -132,9 +132,9 @@ public boolean containsAll(Collection<?> c) {
}

@Override
public CollectionUpdateResult<Boolean> addAll(Collection<? extends String> c) {
public CollectionUpdateResult<Boolean> addAll(Collection<? extends E> c) {
boolean changed = false;
for (String element : c) {
for (E element : c) {
if (add(element).status() == CollectionUpdateResult.Status.OK) {
changed = true;
}
Expand All @@ -145,7 +145,7 @@ public CollectionUpdateResult<Boolean> addAll(Collection<? extends String> c) {
@Override
public CollectionUpdateResult<Boolean> retainAll(Collection<?> c) {
boolean changed = false;
for (String element : collection) {
for (E element : collection) {
if (!c.contains(element) && remove(element).status() == CollectionUpdateResult.Status.OK) {
changed = true;
}
Expand All @@ -157,7 +157,7 @@ public CollectionUpdateResult<Boolean> retainAll(Collection<?> c) {
public CollectionUpdateResult<Boolean> removeAll(Collection<?> c) {
boolean changed = false;
for (Object element : c) {
if (remove(element).status() == CollectionUpdateResult.Status.OK) {
if (remove((E) element).status() == CollectionUpdateResult.Status.OK) {
changed = true;
}
}
Expand Down Expand Up @@ -188,22 +188,20 @@ public long iterate() {
}

@Override
public IteratorBatch<String> next(long iteratorId, int position) {
IteratorContext context = iterators.get(iteratorId);
public IteratorBatch<E> next(long iteratorId, int position) {
AbstractIteratorContext context = iterators.get(iteratorId);
if (context == null) {
return null;
}

List<String> elements = new ArrayList<>();
int size = 0;
while (context.iterator.hasNext()) {
context.position++;
if (context.position > position) {
String element = context.iterator.next();
size += element.length();
List<E> elements = new ArrayList<>();
while (context.iterator().hasNext()) {
context.incrementPosition();
if (context.position() > position) {
E element = context.iterator().next();
elements.add(element);

if (size >= MAX_ITERATOR_BATCH_SIZE) {
if (elements.size() >= MAX_ITERATOR_BATCH_SIZE) {
break;
}
}
Expand All @@ -212,7 +210,7 @@ public IteratorBatch<String> next(long iteratorId, int position) {
if (elements.isEmpty()) {
return null;
}
return new IteratorBatch<>(context.position, elements);
return new IteratorBatch<>(context.position(), elements);
}

@Override
Expand All @@ -223,22 +221,54 @@ public void close(long iteratorId) {
@Override
public void onExpire(Session session) {
listeners.remove(session.sessionId());
iterators.entrySet().removeIf(entry -> entry.getValue().sessionId == session.sessionId().id());
iterators.entrySet().removeIf(entry -> entry.getValue().sessionId() == session.sessionId().id());
}

@Override
public void onClose(Session session) {
listeners.remove(session.sessionId());
iterators.entrySet().removeIf(entry -> entry.getValue().sessionId == session.sessionId().id());
iterators.entrySet().removeIf(entry -> entry.getValue().sessionId() == session.sessionId().id());
}

protected class IteratorContext {
protected abstract class AbstractIteratorContext {
private final long sessionId;
private int position = 0;
private transient Iterator<String> iterator = collection().iterator();
private transient Iterator<E> iterator;

IteratorContext(long sessionId) {
public AbstractIteratorContext(long sessionId) {
this.sessionId = sessionId;
}

protected abstract Iterator<E> create();

public long sessionId() {
return sessionId;
}

public int position() {
return position;
}

public void incrementPosition() {
position++;
}

public Iterator<E> iterator() {
if (iterator == null) {
iterator = create();
}
return iterator;
}
}

protected class IteratorContext extends AbstractIteratorContext {
public IteratorContext(long sessionId) {
super(sessionId);
}

@Override
protected Iterator<E> create() {
return collection().iterator();
}
}
}
Expand Up @@ -21,14 +21,14 @@
/**
* Distributed collection client.
*/
public interface DistributedCollectionClient {
public interface DistributedCollectionClient<E> {

/**
* Called when an event is received by the client.
*
* @param event the collection event
*/
@Event
void onEvent(CollectionEvent<String> event);
void onEvent(CollectionEvent<E> event);

}
Expand Up @@ -35,18 +35,18 @@
/**
* Distributed collection proxy.
*/
public abstract class DistributedCollectionProxy<A extends AsyncDistributedCollection<String>, S extends DistributedCollectionService>
public abstract class DistributedCollectionProxy<A extends AsyncDistributedCollection<E>, S extends DistributedCollectionService<E>, E>
extends AbstractAsyncPrimitive<A, S>
implements AsyncDistributedCollection<String>, DistributedCollectionClient {
implements AsyncDistributedCollection<E>, DistributedCollectionClient<E> {

private final Set<CollectionEventListener<String>> eventListeners = Sets.newIdentityHashSet();
private final Set<CollectionEventListener<E>> eventListeners = Sets.newIdentityHashSet();

public DistributedCollectionProxy(ProxyClient<S> client, PrimitiveRegistry registry) {
super(client, registry);
}

@Override
public void onEvent(CollectionEvent<String> event) {
public void onEvent(CollectionEvent<E> event) {
eventListeners.forEach(l -> l.event(event));
}

Expand All @@ -61,41 +61,41 @@ public CompletableFuture<Boolean> isEmpty() {
}

@Override
public CompletableFuture<Boolean> add(String element) {
public CompletableFuture<Boolean> add(E element) {
return getProxyClient().applyBy(name(), service -> service.add(element))
.thenCompose(result -> checkLocked(result));
}

@Override
public CompletableFuture<Boolean> remove(String element) {
public CompletableFuture<Boolean> remove(E element) {
return getProxyClient().applyBy(name(), service -> service.remove(element))
.thenCompose(result -> checkLocked(result));
}

@Override
public CompletableFuture<Boolean> contains(String element) {
public CompletableFuture<Boolean> contains(E element) {
return getProxyClient().applyBy(name(), service -> service.contains(element));
}

@Override
public CompletableFuture<Boolean> addAll(Collection<? extends String> c) {
public CompletableFuture<Boolean> addAll(Collection<? extends E> c) {
return getProxyClient().applyBy(name(), service -> service.addAll(c))
.thenCompose(result -> checkLocked(result));
}

@Override
public CompletableFuture<Boolean> containsAll(Collection<? extends String> c) {
public CompletableFuture<Boolean> containsAll(Collection<? extends E> c) {
return getProxyClient().applyBy(name(), service -> service.containsAll(c));
}

@Override
public CompletableFuture<Boolean> retainAll(Collection<? extends String> c) {
public CompletableFuture<Boolean> retainAll(Collection<? extends E> c) {
return getProxyClient().applyBy(name(), service -> service.removeAll(c))
.thenCompose(result -> checkLocked(result));
}

@Override
public CompletableFuture<Boolean> removeAll(Collection<? extends String> c) {
public CompletableFuture<Boolean> removeAll(Collection<? extends E> c) {
return getProxyClient().applyBy(name(), service -> service.removeAll(c))
.thenCompose(result -> checkLocked(result));
}
Expand All @@ -108,7 +108,7 @@ protected <T> CompletableFuture<T> checkLocked(CollectionUpdateResult<T> result)
}

@Override
public synchronized CompletableFuture<Void> addListener(CollectionEventListener<String> listener) {
public synchronized CompletableFuture<Void> addListener(CollectionEventListener<E> listener) {
if (eventListeners.isEmpty()) {
eventListeners.add(listener);
return getProxyClient().acceptBy(name(), service -> service.listen()).thenApply(v -> null);
Expand All @@ -119,7 +119,7 @@ public synchronized CompletableFuture<Void> addListener(CollectionEventListener<
}

@Override
public synchronized CompletableFuture<Void> removeListener(CollectionEventListener<String> listener) {
public synchronized CompletableFuture<Void> removeListener(CollectionEventListener<E> listener) {
if (eventListeners.remove(listener) && eventListeners.isEmpty()) {
return getProxyClient().acceptAll(service -> service.unlisten()).thenApply(v -> null);
}
Expand All @@ -131,7 +131,7 @@ private boolean isListening() {
}

@Override
public AsyncIterator<String> iterator() {
public AsyncIterator<E> iterator() {
return new ProxyIterator<>(
getProxyClient(),
getProxyClient().getPartitionId(name()),
Expand Down

0 comments on commit e07bd88

Please sign in to comment.