Skip to content

Commit

Permalink
Refactor all iterator management service operations to optimize for s…
Browse files Browse the repository at this point in the history
…mall iterator batches.
  • Loading branch information
kuujo committed Aug 8, 2018
1 parent dfecbd5 commit b6e2d3e
Show file tree
Hide file tree
Showing 17 changed files with 282 additions and 107 deletions.
Expand Up @@ -34,6 +34,7 @@
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.function.Function;


import static io.atomix.core.collection.impl.CollectionUpdateResult.noop; import static io.atomix.core.collection.impl.CollectionUpdateResult.noop;
import static io.atomix.core.collection.impl.CollectionUpdateResult.ok; import static io.atomix.core.collection.impl.CollectionUpdateResult.ok;
Expand Down Expand Up @@ -183,9 +184,23 @@ public void unlisten() {
} }


@Override @Override
public long iterate() { public IteratorBatch<E> iterate() {
iterators.put(getCurrentIndex(), new IteratorContext(getCurrentSession().sessionId().id())); return iterate(IteratorContext::new);
return getCurrentIndex(); }

protected IteratorBatch<E> iterate(Function<Long, AbstractIteratorContext> contextFactory) {
AbstractIteratorContext iterator = contextFactory.apply(getCurrentSession().sessionId().id());
if (!iterator.iterator().hasNext()) {
return null;
}

long iteratorId = getCurrentIndex();
iterators.put(iteratorId, iterator);
IteratorBatch<E> batch = next(iteratorId, 0);
if (batch.complete()) {
iterators.remove(iteratorId);
}
return batch;
} }


@Override @Override
Expand All @@ -211,7 +226,7 @@ public IteratorBatch<E> next(long iteratorId, int position) {
if (elements.isEmpty()) { if (elements.isEmpty()) {
return null; return null;
} }
return new IteratorBatch<>(context.position(), elements); return new IteratorBatch<>(iteratorId, context.position(), elements, !context.iterator().hasNext());
} }


@Override @Override
Expand Down
Expand Up @@ -29,7 +29,7 @@ public interface IterableService<E> {
* @return the iterator ID * @return the iterator ID
*/ */
@Command @Command
long iterate(); IteratorBatch<E> iterate();


/** /**
* Returns the next batch of elements for the given iterator. * Returns the next batch of elements for the given iterator.
Expand Down
Expand Up @@ -22,13 +22,26 @@
* Iterator batch. * Iterator batch.
*/ */
public final class IteratorBatch<T> implements Iterator<T> { public final class IteratorBatch<T> implements Iterator<T> {
private final long id;
private final int position; private final int position;
private final Collection<T> entries; private final Collection<T> entries;
private final boolean complete;
private transient volatile Iterator<T> iterator; private transient volatile Iterator<T> iterator;


public IteratorBatch(int position, Collection<T> entries) { public IteratorBatch(long id, int position, Collection<T> entries, boolean complete) {
this.id = id;
this.position = position; this.position = position;
this.entries = entries; this.entries = entries;
this.complete = complete;
}

/**
* Returns the iterator identifier.
*
* @return the iterator identifier
*/
public long id() {
return id;
} }


/** /**
Expand All @@ -49,6 +62,15 @@ public Collection<T> entries() {
return entries; return entries;
} }


/**
* Returns a boolean indicating whether the batch is complete.
*
* @return indicates whether this batch completes iteration
*/
public boolean complete() {
return complete;
}

private Iterator<T> iterator() { private Iterator<T> iterator() {
if (iterator == null) { if (iterator == null) {
synchronized (this) { synchronized (this) {
Expand Down
Expand Up @@ -19,13 +19,14 @@
* Iterator open function. * Iterator open function.
*/ */
@FunctionalInterface @FunctionalInterface
public interface OpenFunction<S> { public interface OpenFunction<S, T> {


/** /**
* Opens the iterator using the given service proxy. * Opens the iterator using the given service proxy.
* *
* @param service the service proxy * @param service the service proxy
* @return * @return the initial iterator batch
*/ */
long open(S service); IteratorBatch<T> open(S service);

} }
Expand Up @@ -29,15 +29,15 @@
public class PartitionedProxyIterator<S, T> implements AsyncIterator<T> { public class PartitionedProxyIterator<S, T> implements AsyncIterator<T> {
private final ProxyClient<S> client; private final ProxyClient<S> client;
private final Iterator<AsyncIterator<T>> partitions; private final Iterator<AsyncIterator<T>> partitions;
private final OpenFunction<S> openFunction; private final OpenFunction<S, T> openFunction;
private final NextFunction<S, T> nextFunction; private final NextFunction<S, T> nextFunction;
private final CloseFunction<S> closeFunction; private final CloseFunction<S> closeFunction;
private volatile AsyncIterator<T> iterator; private volatile AsyncIterator<T> iterator;
private AtomicBoolean closed = new AtomicBoolean(); private AtomicBoolean closed = new AtomicBoolean();


public PartitionedProxyIterator( public PartitionedProxyIterator(
ProxyClient<S> client, ProxyClient<S> client,
OpenFunction<S> openFunction, OpenFunction<S, T> openFunction,
NextFunction<S, T> nextFunction, NextFunction<S, T> nextFunction,
CloseFunction<S> closeFunction) { CloseFunction<S> closeFunction) {
this.client = client; this.client = client;
Expand Down
34 changes: 21 additions & 13 deletions core/src/main/java/io/atomix/core/iterator/impl/ProxyIterator.java
Expand Up @@ -21,7 +21,6 @@
import io.atomix.utils.concurrent.Futures; import io.atomix.utils.concurrent.Futures;
import io.atomix.utils.concurrent.OrderedFuture; import io.atomix.utils.concurrent.OrderedFuture;


import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
Expand All @@ -35,23 +34,22 @@ public class ProxyIterator<S, T> implements AsyncIterator<T> {
private final PartitionId partitionId; private final PartitionId partitionId;
private final NextFunction<S, T> nextFunction; private final NextFunction<S, T> nextFunction;
private final CloseFunction<S> closeFunction; private final CloseFunction<S> closeFunction;
private final CompletableFuture<Long> openFuture; private final CompletableFuture<IteratorBatch<T>> openFuture;
private volatile CompletableFuture<IteratorBatch<T>> batch; private volatile CompletableFuture<IteratorBatch<T>> batch;
private volatile CompletableFuture<Void> closeFuture; private volatile CompletableFuture<Void> closeFuture;


public ProxyIterator( public ProxyIterator(
ProxyClient<S> client, ProxyClient<S> client,
PartitionId partitionId, PartitionId partitionId,
OpenFunction<S> openFunction, OpenFunction<S, T> openFunction,
NextFunction<S, T> nextFunction, NextFunction<S, T> nextFunction,
CloseFunction<S> closeFunction) { CloseFunction<S> closeFunction) {
this.client = client; this.client = client;
this.partitionId = partitionId; this.partitionId = partitionId;
this.nextFunction = nextFunction; this.nextFunction = nextFunction;
this.closeFunction = closeFunction; this.closeFunction = closeFunction;
this.openFuture = OrderedFuture.wrap(client.applyOn(partitionId, openFunction::open)); this.openFuture = OrderedFuture.wrap(client.applyOn(partitionId, openFunction::open));
this.batch = CompletableFuture.completedFuture( this.batch = openFuture;
new IteratorBatch<T>(0, Collections.emptyList()));
} }


/** /**
Expand All @@ -76,21 +74,31 @@ private CompletableFuture<Iterator<T>> batch() {
* @return the next batch of entries from the cluster * @return the next batch of entries from the cluster
*/ */
private CompletableFuture<IteratorBatch<T>> fetch(int position) { private CompletableFuture<IteratorBatch<T>> fetch(int position) {
return openFuture.thenCompose(id -> client.applyOn(partitionId, service -> nextFunction.next(service, id, position)) return openFuture.thenCompose(initialBatch -> {
.thenCompose(batch -> { if (!initialBatch.complete()) {
if (batch == null) { return client.applyOn(partitionId, service -> nextFunction.next(service, initialBatch.id(), position))
return close().thenApply(v -> null); .thenCompose(nextBatch -> {
} if (nextBatch == null) {
return CompletableFuture.completedFuture(batch); return close().thenApply(v -> null);
})); }
return CompletableFuture.completedFuture(nextBatch);
});
}
return CompletableFuture.completedFuture(null);
});
} }


@Override @Override
public CompletableFuture<Void> close() { public CompletableFuture<Void> close() {
if (closeFuture == null) { if (closeFuture == null) {
synchronized (this) { synchronized (this) {
if (closeFuture == null) { if (closeFuture == null) {
closeFuture = openFuture.thenCompose(id -> client.acceptOn(partitionId, service -> closeFunction.close(service, id))); closeFuture = openFuture.thenCompose(initialBatch -> {
if (initialBatch != null && !initialBatch.complete()) {
return client.acceptOn(partitionId, service -> closeFunction.close(service, initialBatch.id()));
}
return CompletableFuture.completedFuture(null);
});
} }
} }
} }
Expand Down
Expand Up @@ -46,6 +46,8 @@
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.stream.Collectors; import java.util.stream.Collectors;


Expand Down Expand Up @@ -534,61 +536,80 @@ public void clear() {
} }


@Override @Override
public long iterateKeys() { public IteratorBatch<K> iterateKeys() {
return iterateEntries(); return iterate(DefaultIterator::new, (k, v) -> k);
} }


@Override @Override
public IteratorBatch<K> nextKeys(long iteratorId, int position) { public IteratorBatch<K> nextKeys(long iteratorId, int position) {
IteratorBatch<Map.Entry<K, Versioned<byte[]>>> batch = nextEntries(iteratorId, position); return next(iteratorId, position, (k, v) -> k);
return batch == null ? null : new IteratorBatch<>(batch.position(), batch.entries().stream()
.map(Map.Entry::getKey)
.collect(Collectors.toList()));
} }


@Override @Override
public void closeKeys(long iteratorId) { public void closeKeys(long iteratorId) {
closeEntries(iteratorId); close(iteratorId);
} }


@Override @Override
public long iterateValues() { public IteratorBatch<Versioned<byte[]>> iterateValues() {
return iterateEntries(); return iterate(DefaultIterator::new, (k, v) -> v);
} }


@Override @Override
public IteratorBatch<Versioned<byte[]>> nextValues(long iteratorId, int position) { public IteratorBatch<Versioned<byte[]>> nextValues(long iteratorId, int position) {
IteratorBatch<Map.Entry<K, Versioned<byte[]>>> batch = nextEntries(iteratorId, position); return next(iteratorId, position, (k, v) -> v);
return batch == null ? null : new IteratorBatch<>(batch.position(), batch.entries().stream()
.map(Map.Entry::getValue)
.collect(Collectors.toList()));
} }


@Override @Override
public void closeValues(long iteratorId) { public void closeValues(long iteratorId) {
closeEntries(iteratorId); close(iteratorId);
} }


@Override @Override
public long iterateEntries() { public IteratorBatch<Map.Entry<K, Versioned<byte[]>>> iterateEntries() {
entryIterators.put(getCurrentIndex(), new DefaultIterator(getCurrentSession().sessionId().id())); return iterate(DefaultIterator::new, Maps::immutableEntry);
return getCurrentIndex();
} }


@Override @Override
public IteratorBatch<Map.Entry<K, Versioned<byte[]>>> nextEntries(long iteratorId, int position) { public IteratorBatch<Map.Entry<K, Versioned<byte[]>>> nextEntries(long iteratorId, int position) {
return next(iteratorId, position, Maps::immutableEntry);
}

@Override
public void closeEntries(long iteratorId) {
close(iteratorId);
}

protected <T> IteratorBatch<T> iterate(
Function<Long, IteratorContext> contextFactory,
BiFunction<K, Versioned<byte[]>, T> function) {
IteratorContext iterator = contextFactory.apply(getCurrentSession().sessionId().id());
if (!iterator.iterator().hasNext()) {
return null;
}

long iteratorId = getCurrentIndex();
entryIterators.put(iteratorId, iterator);
IteratorBatch<T> batch = next(iteratorId, 0, function);
if (batch.complete()) {
entryIterators.remove(iteratorId);
}
return batch;
}

protected <T> IteratorBatch<T> next(long iteratorId, int position, BiFunction<K, Versioned<byte[]>, T> function) {
IteratorContext context = entryIterators.get(iteratorId); IteratorContext context = entryIterators.get(iteratorId);
if (context == null) { if (context == null) {
return null; return null;
} }


List<Map.Entry<K, Versioned<byte[]>>> entries = new ArrayList<>(); List<T> entries = new ArrayList<>();
int size = 0; int size = 0;
while (context.iterator().hasNext()) { while (context.iterator().hasNext()) {
context.incrementPosition(); context.incrementPosition();
if (context.position() > position) { if (context.position() > position) {
Map.Entry<K, MapEntryValue> entry = context.iterator().next(); Map.Entry<K, MapEntryValue> entry = context.iterator().next();
entries.add(Maps.immutableEntry(entry.getKey(), toVersioned(entry.getValue()))); entries.add(function.apply(entry.getKey(), toVersioned(entry.getValue())));
size += entry.getValue().value().length; size += entry.getValue().value().length;


if (size >= MAX_ITERATOR_BATCH_SIZE) { if (size >= MAX_ITERATOR_BATCH_SIZE) {
Expand All @@ -600,11 +621,10 @@ public IteratorBatch<Map.Entry<K, Versioned<byte[]>>> nextEntries(long iteratorI
if (entries.isEmpty()) { if (entries.isEmpty()) {
return null; return null;
} }
return new IteratorBatch<>(context.position, entries); return new IteratorBatch<>(iteratorId, context.position, entries, !context.iterator().hasNext());
} }


@Override protected void close(long iteratorId) {
public void closeEntries(long iteratorId) {
entryIterators.remove(iteratorId); entryIterators.remove(iteratorId);
} }


Expand Down
Expand Up @@ -17,6 +17,7 @@
package io.atomix.core.map.impl; package io.atomix.core.map.impl;


import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import io.atomix.core.iterator.impl.IteratorBatch;
import io.atomix.core.map.AtomicNavigableMapType; import io.atomix.core.map.AtomicNavigableMapType;
import io.atomix.core.transaction.TransactionId; import io.atomix.core.transaction.TransactionId;
import io.atomix.primitive.PrimitiveType; import io.atomix.primitive.PrimitiveType;
Expand Down Expand Up @@ -253,15 +254,28 @@ public int subMapSize(K fromKey, boolean fromInclusive, K toKey, boolean toInclu
} }


@Override @Override
public long subMapIterate(K fromKey, boolean fromInclusive, K toKey, boolean toInclusive) { public IteratorBatch<K> subMapIterateKeys(K fromKey, boolean fromInclusive, K toKey, boolean toInclusive) {
entryIterators.put(getCurrentIndex(), new AscendingIterator(getCurrentSession().sessionId().id(), fromKey, fromInclusive, toKey, toInclusive)); return iterate(sessionId -> new AscendingIterator(sessionId, fromKey, fromInclusive, toKey, toInclusive), (k, v) -> k);
return getCurrentIndex();
} }


@Override @Override
public long subMapIterateDescending(K fromKey, boolean fromInclusive, K toKey, boolean toInclusive) { public IteratorBatch<Map.Entry<K, Versioned<byte[]>>> subMapIterateEntries(K fromKey, boolean fromInclusive, K toKey, boolean toInclusive) {
entryIterators.put(getCurrentIndex(), new DescendingIterator(getCurrentSession().sessionId().id(), fromKey, fromInclusive, toKey, toInclusive)); return iterate(sessionId -> new AscendingIterator(sessionId, fromKey, fromInclusive, toKey, toInclusive), Maps::immutableEntry);
return getCurrentIndex(); }

@Override
public IteratorBatch<Versioned<byte[]>> subMapIterateValues(K fromKey, boolean fromInclusive, K toKey, boolean toInclusive) {
return iterate(sessionId -> new AscendingIterator(sessionId, fromKey, fromInclusive, toKey, toInclusive), (k, v) -> v);
}

@Override
public IteratorBatch<K> subMapIterateDescendingKeys(K fromKey, boolean fromInclusive, K toKey, boolean toInclusive) {
return iterate(sessionId -> new DescendingIterator(sessionId, fromKey, fromInclusive, toKey, toInclusive), (k, v) -> k);
}

@Override
public IteratorBatch<Map.Entry<K, Versioned<byte[]>>> subMapIterateDescendingEntries(K fromKey, boolean fromInclusive, K toKey, boolean toInclusive) {
return iterate(sessionId -> new DescendingIterator(sessionId, fromKey, fromInclusive, toKey, toInclusive), Maps::immutableEntry);
} }


@Override @Override
Expand Down

0 comments on commit b6e2d3e

Please sign in to comment.