Skip to content

Commit

Permalink
#IGNITE-53: Move CacheWeakQueryIteratorsHolder to context.
Browse files Browse the repository at this point in the history
  • Loading branch information
ivasilinets committed Jan 19, 2015
1 parent d803400 commit 9a5dcb3
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 61 deletions.
Expand Up @@ -25,6 +25,7 @@
import org.apache.ignite.resources.*; import org.apache.ignite.resources.*;
import org.gridgain.grid.*; import org.gridgain.grid.*;
import org.gridgain.grid.cache.*; import org.gridgain.grid.cache.*;
import org.gridgain.grid.cache.query.*;
import org.gridgain.grid.kernal.*; import org.gridgain.grid.kernal.*;
import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.kernal.processors.cache.*;
import org.gridgain.grid.util.tostring.*; import org.gridgain.grid.util.tostring.*;
Expand Down Expand Up @@ -62,9 +63,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements
/** Projection. */ /** Projection. */
private GridCacheProjectionImpl<K, V> prj; private GridCacheProjectionImpl<K, V> prj;


/** Iterator holder. */
private final CacheWeakQueryIteratorsHolder<Entry<K, V>, Map.Entry<K, V>> itHolder;

/** /**
* @param ctx Context. * @param ctx Context.
* @param delegate Delegate. * @param delegate Delegate.
Expand All @@ -85,16 +83,6 @@ public IgniteCacheProxy(GridCacheContext<K, V> ctx,
this.delegate = delegate; this.delegate = delegate;
this.prj = prj; this.prj = prj;


itHolder = new CacheWeakQueryIteratorsHolder<Entry<K, V>, Map.Entry<K, V>>(ctx.logger(IgniteCacheProxy.class)) {
@Override protected Entry<K, V> convert(Map.Entry<K, V> e) {
return new CacheEntryImpl<>(e.getKey(), e.getValue());
}

@Override protected void remove(Entry<K, V> item) {
IgniteCacheProxy.this.remove(item.getKey());
}
};

gate = ctx.gate(); gate = ctx.gate();
} }


Expand Down Expand Up @@ -942,9 +930,19 @@ public void removeAll(Collection<? extends K> keys) {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj); GridCacheProjectionImpl<K, V> prev = gate.enter(prj);


try { try {
itHolder.checkWeakQueue(); ctx.itHolder().checkWeakQueue();

GridCacheQueryFuture<Map.Entry<K, V>> fut = delegate.queries().createScanQuery(null).execute();


return itHolder.iterator(delegate.queries().createScanQuery(null).execute()); return ctx.itHolder().iterator(fut, new CacheIteratorConverter<Entry<K, V>, Map.Entry<K, V>>() {
@Override protected Entry<K, V> convert(Map.Entry<K, V> e) {
return new CacheEntryImpl<>(e.getKey(), e.getValue());
}

@Override protected void remove(Entry<K, V> item) {
IgniteCacheProxy.this.remove(item.getKey());
}
});
} }
finally { finally {
gate.leave(prev); gate.leave(prev);
Expand Down
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.gridgain.grid.kernal.processors.cache;

/**
* @param <T> Type for iterator.
* @param <V> Type for cache query future.
*/
public abstract class CacheIteratorConverter <T, V> {
/**
* Converts class V to class T.
*
* @param v Item to convert.
* @return Converted item.
*/
protected abstract T convert(V v);

/**
* Removes item.
*
* @param item Item to remove.
*/
protected abstract void remove(T item);
}
Expand Up @@ -26,10 +26,9 @@
import java.util.*; import java.util.*;


/** /**
* @param <T> Type for iterator.
* @param <V> Type for cache query future. * @param <V> Type for cache query future.
*/ */
public abstract class CacheWeakQueryIteratorsHolder<T, V> { public class CacheWeakQueryIteratorsHolder<V> {
/** Iterators weak references queue. */ /** Iterators weak references queue. */
private final ReferenceQueue<WeakQueryFutureIterator> refQueue = new ReferenceQueue<>(); private final ReferenceQueue<WeakQueryFutureIterator> refQueue = new ReferenceQueue<>();


Expand All @@ -48,12 +47,13 @@ public CacheWeakQueryIteratorsHolder(IgniteLogger log) {
} }


/** /**
* Iterator over the cache. * @param fut Query to iterate.
* @param fut Query to iterate * @param convert Cache iterator converter.
* @return iterator * @param <T> Type for the iterator.
* @return Iterator over the cache.
*/ */
public WeakQueryFutureIterator iterator(GridCacheQueryFuture<V> fut) { public <T> WeakQueryFutureIterator iterator(GridCacheQueryFuture<V> fut, CacheIteratorConverter<T, V> convert) {
WeakQueryFutureIterator it = new WeakQueryFutureIterator(fut); WeakQueryFutureIterator it = new WeakQueryFutureIterator(fut, convert);


GridCacheQueryFuture<V> old = futs.put(it.weakReference(), fut); GridCacheQueryFuture<V> old = futs.put(it.weakReference(), fut);


Expand Down Expand Up @@ -94,7 +94,7 @@ public void checkWeakQueue() {
} }


/** /**
* Cancel all cache queries * Cancel all cache queries.
*/ */
public void clearQueries(){ public void clearQueries(){
for (GridCacheQueryFuture<?> fut : futs.values()) { for (GridCacheQueryFuture<?> fut : futs.values()) {
Expand All @@ -109,30 +109,20 @@ public void clearQueries(){
futs.clear(); futs.clear();
} }


/**
* Converts class V to class T.
*
* @param v Item to convert.
* @return Converted item.
*/
protected abstract T convert(V v);

/**
* Removes item.
*
* @param item Item to remove.
*/
protected abstract void remove(T item);


/** /**
* Iterator based of {@link GridCacheQueryFuture}. * Iterator based of {@link GridCacheQueryFuture}.
*
* @param <T> Type for iterator.
*/ */
public class WeakQueryFutureIterator extends GridCloseableIteratorAdapter<T> { public class WeakQueryFutureIterator<T> extends GridCloseableIteratorAdapter<T> {
/** Query future. */ /** Query future. */
private final GridCacheQueryFuture<V> fut; private final GridCacheQueryFuture<V> fut;


/** Weak reference. */ /** Weak reference. */
private final WeakReference<WeakQueryFutureIterator> weakRef; private final WeakReference<WeakQueryFutureIterator<T>> weakRef;

CacheIteratorConverter<T, V> convert;


/** Init flag. */ /** Init flag. */
private boolean init; private boolean init;
Expand All @@ -146,10 +136,12 @@ public class WeakQueryFutureIterator extends GridCloseableIteratorAdapter<T> {
/** /**
* @param fut GridCacheQueryFuture to iterate. * @param fut GridCacheQueryFuture to iterate.
*/ */
WeakQueryFutureIterator(GridCacheQueryFuture<V> fut) { WeakQueryFutureIterator(GridCacheQueryFuture<V> fut, CacheIteratorConverter<T, V> convert) {
this.fut = fut; this.fut = fut;


this.weakRef = new WeakReference<>(this, refQueue); this.weakRef = new WeakReference<>(this, refQueue);

this.convert = convert;
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
Expand All @@ -169,7 +161,7 @@ public class WeakQueryFutureIterator extends GridCloseableIteratorAdapter<T> {
if (futNext == null) if (futNext == null)
clearWeakReference(); clearWeakReference();


next = futNext != null ? convert(futNext) : null; next = futNext != null ? convert.convert(futNext) : null;


return cur; return cur;
} }
Expand Down Expand Up @@ -198,15 +190,15 @@ public class WeakQueryFutureIterator extends GridCloseableIteratorAdapter<T> {
if (cur == null) if (cur == null)
throw new IllegalStateException(); throw new IllegalStateException();


CacheWeakQueryIteratorsHolder.this.remove(cur); convert.remove(cur);


cur = null; cur = null;
} }


/** /**
* @return Iterator weak reference. * @return Iterator weak reference.
*/ */
private WeakReference<WeakQueryFutureIterator> weakReference() { private WeakReference<WeakQueryFutureIterator<T>> weakReference() {
return weakRef; return weakRef;
} }


Expand All @@ -226,7 +218,7 @@ private void init() throws IgniteCheckedException {
if (!init) { if (!init) {
V futNext = fut.next(); V futNext = fut.next();


next = futNext != null ? convert(futNext) : null; next = futNext != null ? convert.convert(futNext) : null;


init = true; init = true;
} }
Expand Down
Expand Up @@ -186,6 +186,9 @@ public class GridCacheContext<K, V> implements Externalizable {
/** Default expiry policy. */ /** Default expiry policy. */
private ExpiryPolicy expiryPlc; private ExpiryPolicy expiryPlc;


/** Cache weak query iterator holder. */
private CacheWeakQueryIteratorsHolder<Map.Entry<K, V>> itHolder;

/** /**
* Empty constructor required for {@link Externalizable}. * Empty constructor required for {@link Externalizable}.
*/ */
Expand Down Expand Up @@ -300,6 +303,8 @@ public GridCacheContext(


if (expiryPlc instanceof EternalExpiryPolicy) if (expiryPlc instanceof EternalExpiryPolicy)
expiryPlc = null; expiryPlc = null;

itHolder = new CacheWeakQueryIteratorsHolder(log);
} }


/** /**
Expand Down Expand Up @@ -836,6 +841,13 @@ public GridCacheContinuousQueryManager<K, V> continuousQueries() {
return contQryMgr; return contQryMgr;
} }


/**
* @return Iterators Holder.
*/
public CacheWeakQueryIteratorsHolder<Map.Entry<K, V>> itHolder() {
return itHolder;
}

/** /**
* @return Swap manager. * @return Swap manager.
*/ */
Expand Down
Expand Up @@ -66,9 +66,6 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements GridCa
/** Removed flag. */ /** Removed flag. */
private volatile boolean rmvd; private volatile boolean rmvd;


/** Iterator holder. */
private final CacheWeakQueryIteratorsHolder<T, Map.Entry<T, ?>> itHolder;

/** /**
* @param ctx Cache context. * @param ctx Cache context.
* @param name Set name. * @param name Set name.
Expand All @@ -84,16 +81,6 @@ public GridCacheSetImpl(GridCacheContext ctx, String name, GridCacheSetHeader hd
cache = ctx.cache(); cache = ctx.cache();


hdrPart = ctx.affinity().partition(new GridCacheSetHeaderKey(name)); hdrPart = ctx.affinity().partition(new GridCacheSetHeaderKey(name));

itHolder = new CacheWeakQueryIteratorsHolder<T, Map.Entry<T, ?>>(ctx.logger(GridCacheSetImpl.class)) {
@Override protected T convert(Map.Entry<T, ?> e) {
return e.getKey();
}

@Override protected void remove(T item) {
GridCacheSetImpl.this.remove(item);
}
};
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
Expand Down Expand Up @@ -348,10 +335,21 @@ private GridCloseableIterator<T> iterator0() {


qry.projection(ctx.grid().forNodes(nodes)); qry.projection(ctx.grid().forNodes(nodes));


CacheWeakQueryIteratorsHolder.WeakQueryFutureIterator it = itHolder.iterator(qry.execute()); GridCacheQueryFuture<Map.Entry<T, ?>> fut = qry.execute();

CacheWeakQueryIteratorsHolder.WeakQueryFutureIterator it =
ctx.itHolder().iterator(fut, new CacheIteratorConverter<T, Map.Entry<T, ?>>() {
@Override protected T convert(Map.Entry<T, ?> e) {
return e.getKey();
}

@Override protected void remove(T item) {
GridCacheSetImpl.this.remove(item);
}
});


if (rmvd) { if (rmvd) {
itHolder.removeIterator(it); ctx.itHolder().removeIterator(it);


checkRemoved(); checkRemoved();
} }
Expand Down Expand Up @@ -438,7 +436,7 @@ void removed(boolean rmvd) {
this.rmvd = rmvd; this.rmvd = rmvd;


if (rmvd) if (rmvd)
itHolder.clearQueries(); ctx.itHolder().clearQueries();
} }


/** /**
Expand All @@ -453,7 +451,7 @@ private void checkRemoved() {
* Checks if set was removed and handles iterators weak reference queue. * Checks if set was removed and handles iterators weak reference queue.
*/ */
private void onAccess() { private void onAccess() {
itHolder.checkWeakQueue(); ctx.itHolder().checkWeakQueue();


checkRemoved(); checkRemoved();
} }
Expand Down

0 comments on commit 9a5dcb3

Please sign in to comment.