Skip to content

Commit

Permalink
Implement distributed multiset state machine.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Jun 26, 2018
1 parent ca20061 commit b06d4ff
Show file tree
Hide file tree
Showing 24 changed files with 1,755 additions and 86 deletions.
6 changes: 6 additions & 0 deletions core/src/main/java/io/atomix/core/Atomix.java
Expand Up @@ -29,6 +29,7 @@
import io.atomix.core.atomic.treemap.AtomicTreeMap; import io.atomix.core.atomic.treemap.AtomicTreeMap;
import io.atomix.core.atomic.value.AtomicValue; import io.atomix.core.atomic.value.AtomicValue;
import io.atomix.core.collection.list.DistributedList; import io.atomix.core.collection.list.DistributedList;
import io.atomix.core.collection.multiset.DistributedMultiset;
import io.atomix.core.collection.queue.DistributedQueue; import io.atomix.core.collection.queue.DistributedQueue;
import io.atomix.core.collection.set.DistributedSet; import io.atomix.core.collection.set.DistributedSet;
import io.atomix.core.concurrent.lock.DistributedLock; import io.atomix.core.concurrent.lock.DistributedLock;
Expand Down Expand Up @@ -372,6 +373,11 @@ public <E> DistributedList<E> getList(String name) {
return primitives.getList(name); return primitives.getList(name);
} }


@Override
public <E> DistributedMultiset<E> getMultiset(String name) {
return primitives.getMultiset(name);
}

@Override @Override
public AtomicCounter getAtomicCounter(String name) { public AtomicCounter getAtomicCounter(String name) {
return primitives.getAtomicCounter(name); return primitives.getAtomicCounter(name);
Expand Down
35 changes: 35 additions & 0 deletions core/src/main/java/io/atomix/core/PrimitivesService.java
Expand Up @@ -42,6 +42,9 @@
import io.atomix.core.collection.list.DistributedList; import io.atomix.core.collection.list.DistributedList;
import io.atomix.core.collection.list.DistributedListBuilder; import io.atomix.core.collection.list.DistributedListBuilder;
import io.atomix.core.collection.list.DistributedListType; import io.atomix.core.collection.list.DistributedListType;
import io.atomix.core.collection.multiset.DistributedMultiset;
import io.atomix.core.collection.multiset.DistributedMultisetBuilder;
import io.atomix.core.collection.multiset.DistributedMultisetType;
import io.atomix.core.collection.queue.DistributedQueue; import io.atomix.core.collection.queue.DistributedQueue;
import io.atomix.core.collection.queue.DistributedQueueBuilder; import io.atomix.core.collection.queue.DistributedQueueBuilder;
import io.atomix.core.collection.queue.DistributedQueueType; import io.atomix.core.collection.queue.DistributedQueueType;
Expand Down Expand Up @@ -266,6 +269,29 @@ default <E> DistributedListBuilder<E> listBuilder(String name, PrimitiveProtocol
return primitiveBuilder(name, DistributedListType.instance(), protocol); return primitiveBuilder(name, DistributedListType.instance(), protocol);
} }


/**
* Creates a new DistributedMultisetBuilder.
*
* @param name the primitive name
* @param <E> multiset element type
* @return builder for a distributed multiset
*/
default <E> DistributedMultisetBuilder<E> multisetBuilder(String name) {
return primitiveBuilder(name, DistributedMultisetType.instance());
}

/**
* Creates a new DistributedMultisetBuilder.
*
* @param name the primitive name
* @param protocol the primitive protocol
* @param <E> multiset element type
* @return builder for a distributed multiset
*/
default <E> DistributedMultisetBuilder<E> multisetBuilder(String name, PrimitiveProtocol protocol) {
return primitiveBuilder(name, DistributedMultisetType.instance(), protocol);
}

/** /**
* Creates a new AtomicCounterBuilder. * Creates a new AtomicCounterBuilder.
* *
Expand Down Expand Up @@ -529,6 +555,15 @@ default TransactionBuilder transactionBuilder() {
*/ */
<E> DistributedList<E> getList(String name); <E> DistributedList<E> getList(String name);


/**
* Creates a new DistributedMultiset.
*
* @param name the primitive name
* @param <E> multiset element type
* @return a multiton instance of a distributed multiset
*/
<E> DistributedMultiset<E> getMultiset(String name);

/** /**
* Creates a new AtomicCounterBuilder. * Creates a new AtomicCounterBuilder.
* *
Expand Down
Expand Up @@ -18,6 +18,7 @@


import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Multiset;
import io.atomix.core.atomic.multimap.AsyncAtomicMultimap; import io.atomix.core.atomic.multimap.AsyncAtomicMultimap;
import io.atomix.core.atomic.multimap.AtomicMultimap; import io.atomix.core.atomic.multimap.AtomicMultimap;
import io.atomix.core.atomic.multimap.AtomicMultimapEvent; import io.atomix.core.atomic.multimap.AtomicMultimapEvent;
Expand Down Expand Up @@ -393,6 +394,41 @@ public CompletableFuture<Boolean> contains(String element) {
return containsKey(element); return containsKey(element);
} }


@Override
public CompletableFuture<Integer> count(Object element) {
return get((String) element).thenApply(value -> value == null ? 0 : value.value().size());
}

@Override
public CompletableFuture<Integer> add(String element, int occurrences) {
return Futures.exceptionalFuture(new UnsupportedOperationException());
}

@Override
public CompletableFuture<Integer> remove(Object element, int occurrences) {
return Futures.exceptionalFuture(new UnsupportedOperationException());
}

@Override
public CompletableFuture<Integer> setCount(String element, int count) {
return Futures.exceptionalFuture(new UnsupportedOperationException());
}

@Override
public CompletableFuture<Boolean> setCount(String element, int oldCount, int newCount) {
return Futures.exceptionalFuture(new UnsupportedOperationException());
}

@Override
public AsyncDistributedSet<String> elementSet() {
return new AtomicMultimapKeySet();
}

@Override
public AsyncDistributedSet<Multiset.Entry<String>> entrySet() {
throw new UnsupportedOperationException();
}

@Override @Override
public CompletableFuture<Boolean> addAll(Collection<? extends String> c) { public CompletableFuture<Boolean> addAll(Collection<? extends String> c) {
return Futures.exceptionalFuture(new UnsupportedOperationException()); return Futures.exceptionalFuture(new UnsupportedOperationException());
Expand Down Expand Up @@ -512,6 +548,41 @@ public CompletableFuture<Boolean> contains(byte[] element) {
return containsValue(element); return containsValue(element);
} }


@Override
public CompletableFuture<Integer> count(Object element) {
return Futures.exceptionalFuture(new UnsupportedOperationException());
}

@Override
public CompletableFuture<Integer> add(byte[] element, int occurrences) {
return Futures.exceptionalFuture(new UnsupportedOperationException());
}

@Override
public CompletableFuture<Integer> remove(Object element, int occurrences) {
return Futures.exceptionalFuture(new UnsupportedOperationException());
}

@Override
public CompletableFuture<Integer> setCount(byte[] element, int count) {
return Futures.exceptionalFuture(new UnsupportedOperationException());
}

@Override
public CompletableFuture<Boolean> setCount(byte[] element, int oldCount, int newCount) {
return Futures.exceptionalFuture(new UnsupportedOperationException());
}

@Override
public AsyncDistributedSet<byte[]> elementSet() {
throw new UnsupportedOperationException();
}

@Override
public AsyncDistributedSet<Multiset.Entry<byte[]>> entrySet() {
throw new UnsupportedOperationException();
}

@Override @Override
public CompletableFuture<Boolean> addAll(Collection<? extends byte[]> c) { public CompletableFuture<Boolean> addAll(Collection<? extends byte[]> c) {
return Futures.exceptionalFuture(new UnsupportedOperationException()); return Futures.exceptionalFuture(new UnsupportedOperationException());
Expand Down
Expand Up @@ -164,13 +164,13 @@ public CompletableFuture<A> connect() {
*/ */
private class DistributedCollectionIterator implements AsyncIterator<String> { private class DistributedCollectionIterator implements AsyncIterator<String> {
private final long id; private final long id;
private volatile CompletableFuture<DistributedCollectionService.Batch> batch; private volatile CompletableFuture<DistributedCollectionService.Batch<String>> batch;
private volatile CompletableFuture<Void> closeFuture; private volatile CompletableFuture<Void> closeFuture;


DistributedCollectionIterator(long id) { DistributedCollectionIterator(long id) {
this.id = id; this.id = id;
this.batch = CompletableFuture.completedFuture( this.batch = CompletableFuture.completedFuture(
new DistributedCollectionService.Batch(0, Collections.emptyList())); new DistributedCollectionService.Batch<>(0, Collections.emptyList()));
} }


/** /**
Expand All @@ -194,7 +194,7 @@ private CompletableFuture<Iterator<String>> batch() {
* @param position the position from which to fetch the next batch * @param position the position from which to fetch the next batch
* @return the next batch of entries from the cluster * @return the next batch of entries from the cluster
*/ */
private CompletableFuture<DistributedCollectionService.Batch> fetch(int position) { private CompletableFuture<DistributedCollectionService.Batch<String>> fetch(int position) {
return getProxyClient().applyBy(name(), service -> service.next(id, position)) return getProxyClient().applyBy(name(), service -> service.next(id, position))
.thenCompose(batch -> { .thenCompose(batch -> {
if (batch == null) { if (batch == null) {
Expand Down
Expand Up @@ -259,7 +259,7 @@ public interface DistributedCollectionService {
* @return the next batch of entries for the iterator or {@code null} if the iterator is complete * @return the next batch of entries for the iterator or {@code null} if the iterator is complete
*/ */
@Query @Query
Batch next(long iteratorId, int position); Batch<String> next(long iteratorId, int position);


/** /**
* Closes an iterator. * Closes an iterator.
Expand All @@ -272,12 +272,12 @@ public interface DistributedCollectionService {
/** /**
* Iterator batch. * Iterator batch.
*/ */
final class Batch implements Iterator<String> { final class Batch<T> implements Iterator<T> {
private final int position; private final int position;
private final Collection<String> elements; private final Collection<T> elements;
private transient volatile Iterator<String> iterator; private transient volatile Iterator<T> iterator;


Batch(int position, Collection<String> elements) { public Batch(int position, Collection<T> elements) {
this.position = position; this.position = position;
this.elements = elements; this.elements = elements;
} }
Expand All @@ -296,11 +296,11 @@ public int position() {
* *
* @return the batch of elements * @return the batch of elements
*/ */
public Collection<String> elements() { public Collection<T> elements() {
return elements; return elements;
} }


private Iterator<String> iterator() { private Iterator<T> iterator() {
if (iterator == null) { if (iterator == null) {
synchronized (this) { synchronized (this) {
if (iterator == null) { if (iterator == null) {
Expand All @@ -317,7 +317,7 @@ public boolean hasNext() {
} }


@Override @Override
public String next() { public T next() {
return iterator().next(); return iterator().next();
} }
} }
Expand Down
Expand Up @@ -237,13 +237,14 @@ public CompletableFuture<String> next() {
private class DistributedCollectionPartitionIterator implements AsyncIterator<String> { private class DistributedCollectionPartitionIterator implements AsyncIterator<String> {
private final PartitionId partitionId; private final PartitionId partitionId;
private final long iteratorId; private final long iteratorId;
private volatile CompletableFuture<DistributedCollectionService.Batch> batch; private volatile CompletableFuture<DistributedCollectionService.Batch<String>> batch;
private volatile CompletableFuture<Void> closeFuture; private volatile CompletableFuture<Void> closeFuture;


DistributedCollectionPartitionIterator(PartitionId partitionId, long iteratorId) { DistributedCollectionPartitionIterator(PartitionId partitionId, long iteratorId) {
this.partitionId = partitionId; this.partitionId = partitionId;
this.iteratorId = iteratorId; this.iteratorId = iteratorId;
this.batch = CompletableFuture.completedFuture(new DistributedCollectionService.Batch(0, Collections.emptyList())); this.batch = CompletableFuture.completedFuture(
new DistributedCollectionService.Batch<>(0, Collections.emptyList()));
} }


/** /**
Expand All @@ -267,7 +268,7 @@ private CompletableFuture<Iterator<String>> batch() {
* @param position the position from which to fetch the next batch * @param position the position from which to fetch the next batch
* @return the next batch of entries from the cluster * @return the next batch of entries from the cluster
*/ */
private CompletableFuture<DistributedCollectionService.Batch> fetch(int position) { private CompletableFuture<DistributedCollectionService.Batch<String>> fetch(int position) {
return getProxyClient().applyOn(partitionId, service -> service.next(iteratorId, position)) return getProxyClient().applyOn(partitionId, service -> service.next(iteratorId, position))
.thenCompose(batch -> { .thenCompose(batch -> {
if (batch == null) { if (batch == null) {
Expand Down

0 comments on commit b06d4ff

Please sign in to comment.