Skip to content

Commit

Permalink
ignite-2921: ScanQueries over local partitions performance optimisation
Browse files Browse the repository at this point in the history
  • Loading branch information
ashutakGG committed May 16, 2016
1 parent 9a9c35d commit 0e8072f
Show file tree
Hide file tree
Showing 25 changed files with 1,449 additions and 572 deletions.
Expand Up @@ -26,6 +26,7 @@
import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture; import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.typedef.internal.U;
import org.jsr166.ConcurrentHashMap8; import org.jsr166.ConcurrentHashMap8;


Expand All @@ -34,11 +35,10 @@
*/ */
public class CacheWeakQueryIteratorsHolder<V> { public class CacheWeakQueryIteratorsHolder<V> {
/** Iterators weak references queue. */ /** Iterators weak references queue. */
private final ReferenceQueue<WeakQueryFutureIterator> refQueue = new ReferenceQueue<>(); private final ReferenceQueue refQueue = new ReferenceQueue();


/** Iterators futures. */ /** Iterators futures. */
private final Map<WeakReference<WeakQueryFutureIterator>,CacheQueryFuture<V>> futs = private final Map<WeakReference, AutoCloseable> refs = new ConcurrentHashMap8<>();
new ConcurrentHashMap8<>();


/** Logger. */ /** Logger. */
private final IgniteLogger log; private final IgniteLogger log;
Expand All @@ -56,10 +56,27 @@ public CacheWeakQueryIteratorsHolder(IgniteLogger log) {
* @param <T> Type for the iterator. * @param <T> Type for the iterator.
* @return Iterator over the cache. * @return Iterator over the cache.
*/ */
public <T> WeakQueryFutureIterator iterator(CacheQueryFuture<V> fut, CacheIteratorConverter<T, V> convert) { public <T> WeakReferenceCloseableIterator<T> iterator(final CacheQueryFuture<V> fut,
CacheIteratorConverter<T, V> convert) {
WeakQueryFutureIterator it = new WeakQueryFutureIterator(fut, convert); WeakQueryFutureIterator it = new WeakQueryFutureIterator(fut, convert);


CacheQueryFuture<V> old = futs.put(it.weakReference(), fut); AutoCloseable old = refs.put(it.weakReference(), fut);

assert old == null;

return it;
}

/**
* @param iter Closeable iterator.
* @param <T> Type for the iterator.
* @return Iterator over the cache.
*/
public <T> WeakReferenceCloseableIterator<T> iterator(final GridCloseableIterator<V> iter,
CacheIteratorConverter<T, V> convert) {
WeakQueryCloseableIterator it = new WeakQueryCloseableIterator(iter, convert);

AutoCloseable old = refs.put(it.weakReference(), iter);


assert old == null; assert old == null;


Expand All @@ -71,8 +88,8 @@ public <T> WeakQueryFutureIterator iterator(CacheQueryFuture<V> fut, CacheIterat
* *
* @throws IgniteCheckedException If failed. * @throws IgniteCheckedException If failed.
*/ */
public void removeIterator(WeakQueryFutureIterator it) throws IgniteCheckedException { public void removeIterator(WeakReferenceCloseableIterator it) throws IgniteCheckedException {
futs.remove(it.weakReference()); refs.remove(it.weakReference());


it.close(); it.close();
} }
Expand All @@ -81,17 +98,17 @@ public void removeIterator(WeakQueryFutureIterator it) throws IgniteCheckedExcep
* Closes unreachable iterators. * Closes unreachable iterators.
*/ */
public void checkWeakQueue() { public void checkWeakQueue() {
for (Reference<? extends WeakQueryFutureIterator> itRef = refQueue.poll(); itRef != null; for (Reference itRef = refQueue.poll(); itRef != null;
itRef = refQueue.poll()) { itRef = refQueue.poll()) {
try { try {
WeakReference<WeakQueryFutureIterator> weakRef = (WeakReference<WeakQueryFutureIterator>)itRef; WeakReference weakRef = (WeakReference)itRef;


CacheQueryFuture<?> fut = futs.remove(weakRef); AutoCloseable rsrc = refs.remove(weakRef);


if (fut != null) if (rsrc != null)
fut.cancel(); rsrc.close();
} }
catch (IgniteCheckedException e) { catch (Exception e) {
U.error(log, "Failed to close iterator.", e); U.error(log, "Failed to close iterator.", e);
} }
} }
Expand All @@ -101,16 +118,16 @@ public void checkWeakQueue() {
* Cancel all cache queries. * Cancel all cache queries.
*/ */
public void clearQueries(){ public void clearQueries(){
for (CacheQueryFuture<?> fut : futs.values()) { for (AutoCloseable rsrc : refs.values()) {
try { try {
fut.cancel(); rsrc.close();
} }
catch (IgniteCheckedException e) { catch (Exception e) {
U.error(log, "Failed to close iterator.", e); U.error(log, "Failed to close iterator.", e);
} }
} }


futs.clear(); refs.clear();
} }




Expand All @@ -119,7 +136,8 @@ public void clearQueries(){
* *
* @param <T> Type for iterator. * @param <T> Type for iterator.
*/ */
public class WeakQueryFutureIterator<T> extends GridCloseableIteratorAdapter<T> { private class WeakQueryFutureIterator<T> extends GridCloseableIteratorAdapter<T>
implements WeakReferenceCloseableIterator<T> {
/** */ /** */
private static final long serialVersionUID = 0L; private static final long serialVersionUID = 0L;


Expand Down Expand Up @@ -204,10 +222,8 @@ public class WeakQueryFutureIterator<T> extends GridCloseableIteratorAdapter<T>
cur = null; cur = null;
} }


/** /** {@inheritDoc} */
* @return Iterator weak reference. @Override public WeakReference<WeakQueryFutureIterator<T>> weakReference() {
*/
private WeakReference<WeakQueryFutureIterator<T>> weakReference() {
return weakRef; return weakRef;
} }


Expand All @@ -217,7 +233,7 @@ private WeakReference<WeakQueryFutureIterator<T>> weakReference() {
private void clearWeakReference() { private void clearWeakReference() {
weakRef.clear(); weakRef.clear();


futs.remove(weakRef); refs.remove(weakRef);
} }


/** /**
Expand All @@ -233,4 +249,109 @@ private void init() throws IgniteCheckedException {
} }
} }
} }
}
/**
* @param <T> Type.
*/
public class WeakQueryCloseableIterator<T> extends GridCloseableIteratorAdapter<T>
implements WeakReferenceCloseableIterator<T> {
/** */
private static final long serialVersionUID = 0;

/** */
private final GridCloseableIterator<V> iter;

/** */
private final CacheIteratorConverter<T, V> convert;

/** */
private final WeakReference weakRef;

/** */
private T cur;

/**
* @param iter Iterator.
* @param convert Converter.
*/
WeakQueryCloseableIterator(GridCloseableIterator<V> iter, CacheIteratorConverter<T, V> convert) {
this.iter = iter;
this.convert = convert;

weakRef = new WeakReference(this, refQueue);
}


/** {@inheritDoc} */
@Override protected T onNext() throws IgniteCheckedException {
V next;

try {
next = iter.nextX();
}
catch (NoSuchElementException e){
clearWeakReference();

throw e;
}

if (next == null)
clearWeakReference();

cur = next != null ? convert.convert(next) : null;

return cur;
}

/** {@inheritDoc} */
@Override protected boolean onHasNext() throws IgniteCheckedException {
boolean hasNextX = iter.hasNextX();

if (!hasNextX)
clearWeakReference();

return hasNextX;
}

/** {@inheritDoc} */
@Override protected void onRemove() throws IgniteCheckedException {
if (cur == null)
throw new IllegalStateException();

convert.remove(cur);

cur = null;
}

/** {@inheritDoc} */
@Override protected void onClose() throws IgniteCheckedException {
iter.close();

clearWeakReference();
}

/**
* Clears weak reference.
*/
private void clearWeakReference() {
weakRef.clear();

refs.remove(weakRef);
}

/** {@inheritDoc} */
@Override public WeakReference weakReference() {
return weakRef;
}
}

/**
*
*/
public static interface WeakReferenceCloseableIterator<T> extends GridCloseableIterator<T> {
/**
* @return Iterator weak reference.
*/
public WeakReference weakReference();
}
}
Expand Up @@ -86,7 +86,6 @@
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter;
Expand All @@ -105,6 +104,7 @@
import org.apache.ignite.internal.util.future.GridEmbeddedFuture; import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.lang.GridClosureException; import org.apache.ignite.internal.util.lang.GridClosureException;
import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.C1;
Expand Down Expand Up @@ -2400,7 +2400,8 @@ private <T> EntryProcessorResult<T> invoke0(
validateCacheKeys(keys); validateCacheKeys(keys);


IgniteInternalFuture<?> fut = asyncOp(new AsyncOp(keys) { IgniteInternalFuture<?> fut = asyncOp(new AsyncOp(keys) {
@Override public IgniteInternalFuture<GridCacheReturn> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { @Override public IgniteInternalFuture<GridCacheReturn> op(IgniteTxLocalAdapter tx,
AffinityTopologyVersion readyTopVer) {
Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor<K, V, Object>>() { Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor<K, V, Object>>() {
@Override public EntryProcessor apply(K k) { @Override public EntryProcessor apply(K k) {
return entryProcessor; return entryProcessor;
Expand Down Expand Up @@ -3961,19 +3962,19 @@ private Iterator<Cache.Entry<K, V>> localIteratorHonorExpirePolicy(final CacheOp
/** /**
* @return Distributed ignite cache iterator. * @return Distributed ignite cache iterator.
*/ */
public Iterator<Cache.Entry<K, V>> igniteIterator() { public Iterator<Cache.Entry<K, V>> igniteIterator() throws IgniteCheckedException {
GridCacheContext ctx0 = ctx.isNear() ? ctx.near().dht().context() : ctx; GridCacheContext ctx0 = ctx.isNear() ? ctx.near().dht().context() : ctx;


final CacheOperationContext opCtx = ctx.operationContextPerCall(); final CacheOperationContext opCtx = ctx.operationContextPerCall();


if (!ctx0.isSwapOrOffheapEnabled() && ctx0.kernalContext().discovery().size() == 1) if (!ctx0.isSwapOrOffheapEnabled() && ctx0.kernalContext().discovery().size() == 1)
return localIteratorHonorExpirePolicy(opCtx); return localIteratorHonorExpirePolicy(opCtx);


CacheQueryFuture<Map.Entry<K, V>> fut = ctx0.queries().createScanQuery(null, null, ctx.keepBinary()) final GridCloseableIterator<Map.Entry<K, V>> iter = ctx0.queries().createScanQuery(null, null, ctx.keepBinary())
.keepAll(false) .keepAll(false)
.execute(); .executeScanQuery();


return ctx.itHolder().iterator(fut, new CacheIteratorConverter<Cache.Entry<K, V>, Map.Entry<K, V>>() { return ctx.itHolder().iterator(iter, new CacheIteratorConverter<Cache.Entry<K, V>, Map.Entry<K, V>>() {
@Override protected Cache.Entry<K, V> convert(Map.Entry<K, V> e) { @Override protected Cache.Entry<K, V> convert(Map.Entry<K, V> e) {
return new CacheEntryImpl<>(e.getKey(), e.getValue()); return new CacheEntryImpl<>(e.getKey(), e.getValue());
} }
Expand Down
Expand Up @@ -1848,7 +1848,7 @@ private Iterator<KeyCacheObject> keyIterator(
* @return Off-heap iterator. * @return Off-heap iterator.
*/ */
public <T> GridCloseableIterator<T> rawOffHeapIterator(final CX2<T2<Long, Integer>, T2<Long, Integer>, T> c, public <T> GridCloseableIterator<T> rawOffHeapIterator(final CX2<T2<Long, Integer>, T2<Long, Integer>, T> c,
Integer part, @Nullable Integer part,
boolean primary, boolean primary,
boolean backup) boolean backup)
{ {
Expand All @@ -1859,8 +1859,12 @@ public <T> GridCloseableIterator<T> rawOffHeapIterator(final CX2<T2<Long, Intege


checkIteratorQueue(); checkIteratorQueue();


if (primary && backup) if (primary && backup) {
return offheap.iterator(spaceName, c); if (part == null)
return offheap.iterator(spaceName, c);
else
return offheap.iterator(spaceName, c, part);
}


AffinityTopologyVersion ver = cctx.affinity().affinityTopologyVersion(); AffinityTopologyVersion ver = cctx.affinity().affinityTopologyVersion();


Expand Down Expand Up @@ -1894,7 +1898,7 @@ public GridCloseableIterator<Map.Entry<byte[], byte[]>> rawOffHeapIterator(@Null
if (!offheapEnabled || (!primary && !backup)) if (!offheapEnabled || (!primary && !backup))
return new GridEmptyCloseableIterator<>(); return new GridEmptyCloseableIterator<>();


if (primary && backup) if (primary && backup && part == null)
return new GridCloseableIteratorAdapter<Map.Entry<byte[], byte[]>>() { return new GridCloseableIteratorAdapter<Map.Entry<byte[], byte[]>>() {
private GridCloseableIterator<IgniteBiTuple<byte[], byte[]>> it = offheap.iterator(spaceName); private GridCloseableIterator<IgniteBiTuple<byte[], byte[]>> it = offheap.iterator(spaceName);


Expand Down

0 comments on commit 0e8072f

Please sign in to comment.