From 9a5dcb3a4a4c964ff4c53bf57a791170b310ff0c Mon Sep 17 00:00:00 2001 From: ivasilinets Date: Mon, 19 Jan 2015 17:31:18 +0400 Subject: [PATCH] #IGNITE-53: Move CacheWeakQueryIteratorsHolder to context. --- .../processors/cache/IgniteCacheProxy.java | 28 +++++------ .../cache/CacheIteratorConverter.java | 39 +++++++++++++++ .../cache/CacheWeakQueryIteratorsHolder.java | 50 ++++++++----------- .../processors/cache/GridCacheContext.java | 12 +++++ .../datastructures/GridCacheSetImpl.java | 32 ++++++------ 5 files changed, 100 insertions(+), 61 deletions(-) create mode 100644 modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheIteratorConverter.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 3ac11be4f0977..46fbfb4104b93 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -25,6 +25,7 @@ import org.apache.ignite.resources.*; import org.gridgain.grid.*; import org.gridgain.grid.cache.*; +import org.gridgain.grid.cache.query.*; import org.gridgain.grid.kernal.*; import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.util.tostring.*; @@ -62,9 +63,6 @@ public class IgniteCacheProxy extends IgniteAsyncSupportAdapter implements /** Projection. */ private GridCacheProjectionImpl prj; - /** Iterator holder. */ - private final CacheWeakQueryIteratorsHolder, Map.Entry> itHolder; - /** * @param ctx Context. * @param delegate Delegate. @@ -85,16 +83,6 @@ public IgniteCacheProxy(GridCacheContext ctx, this.delegate = delegate; this.prj = prj; - itHolder = new CacheWeakQueryIteratorsHolder, Map.Entry>(ctx.logger(IgniteCacheProxy.class)) { - @Override protected Entry convert(Map.Entry e) { - return new CacheEntryImpl<>(e.getKey(), e.getValue()); - } - - @Override protected void remove(Entry item) { - IgniteCacheProxy.this.remove(item.getKey()); - } - }; - gate = ctx.gate(); } @@ -942,9 +930,19 @@ public void removeAll(Collection keys) { GridCacheProjectionImpl prev = gate.enter(prj); try { - itHolder.checkWeakQueue(); + ctx.itHolder().checkWeakQueue(); + + GridCacheQueryFuture> fut = delegate.queries().createScanQuery(null).execute(); - return itHolder.iterator(delegate.queries().createScanQuery(null).execute()); + return ctx.itHolder().iterator(fut, new CacheIteratorConverter, Map.Entry>() { + @Override protected Entry convert(Map.Entry e) { + return new CacheEntryImpl<>(e.getKey(), e.getValue()); + } + + @Override protected void remove(Entry item) { + IgniteCacheProxy.this.remove(item.getKey()); + } + }); } finally { gate.leave(prev); diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheIteratorConverter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheIteratorConverter.java new file mode 100644 index 0000000000000..466460dd0d866 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheIteratorConverter.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.gridgain.grid.kernal.processors.cache; + +/** + * @param Type for iterator. + * @param Type for cache query future. + */ +public abstract class CacheIteratorConverter { + /** + * Converts class V to class T. + * + * @param v Item to convert. + * @return Converted item. + */ + protected abstract T convert(V v); + + /** + * Removes item. + * + * @param item Item to remove. + */ + protected abstract void remove(T item); +} diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheWeakQueryIteratorsHolder.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheWeakQueryIteratorsHolder.java index 1459927ae7b63..2980663335df2 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheWeakQueryIteratorsHolder.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheWeakQueryIteratorsHolder.java @@ -26,10 +26,9 @@ import java.util.*; /** - * @param Type for iterator. * @param Type for cache query future. */ -public abstract class CacheWeakQueryIteratorsHolder { +public class CacheWeakQueryIteratorsHolder { /** Iterators weak references queue. */ private final ReferenceQueue refQueue = new ReferenceQueue<>(); @@ -48,12 +47,13 @@ public CacheWeakQueryIteratorsHolder(IgniteLogger log) { } /** - * Iterator over the cache. - * @param fut Query to iterate - * @return iterator + * @param fut Query to iterate. + * @param convert Cache iterator converter. + * @param Type for the iterator. + * @return Iterator over the cache. */ - public WeakQueryFutureIterator iterator(GridCacheQueryFuture fut) { - WeakQueryFutureIterator it = new WeakQueryFutureIterator(fut); + public WeakQueryFutureIterator iterator(GridCacheQueryFuture fut, CacheIteratorConverter convert) { + WeakQueryFutureIterator it = new WeakQueryFutureIterator(fut, convert); GridCacheQueryFuture old = futs.put(it.weakReference(), fut); @@ -94,7 +94,7 @@ public void checkWeakQueue() { } /** - * Cancel all cache queries + * Cancel all cache queries. */ public void clearQueries(){ for (GridCacheQueryFuture fut : futs.values()) { @@ -109,30 +109,20 @@ public void clearQueries(){ futs.clear(); } - /** - * Converts class V to class T. - * - * @param v Item to convert. - * @return Converted item. - */ - protected abstract T convert(V v); - - /** - * Removes item. - * - * @param item Item to remove. - */ - protected abstract void remove(T item); /** * Iterator based of {@link GridCacheQueryFuture}. + * + * @param Type for iterator. */ - public class WeakQueryFutureIterator extends GridCloseableIteratorAdapter { + public class WeakQueryFutureIterator extends GridCloseableIteratorAdapter { /** Query future. */ private final GridCacheQueryFuture fut; /** Weak reference. */ - private final WeakReference weakRef; + private final WeakReference> weakRef; + + CacheIteratorConverter convert; /** Init flag. */ private boolean init; @@ -146,10 +136,12 @@ public class WeakQueryFutureIterator extends GridCloseableIteratorAdapter { /** * @param fut GridCacheQueryFuture to iterate. */ - WeakQueryFutureIterator(GridCacheQueryFuture fut) { + WeakQueryFutureIterator(GridCacheQueryFuture fut, CacheIteratorConverter convert) { this.fut = fut; this.weakRef = new WeakReference<>(this, refQueue); + + this.convert = convert; } /** {@inheritDoc} */ @@ -169,7 +161,7 @@ public class WeakQueryFutureIterator extends GridCloseableIteratorAdapter { if (futNext == null) clearWeakReference(); - next = futNext != null ? convert(futNext) : null; + next = futNext != null ? convert.convert(futNext) : null; return cur; } @@ -198,7 +190,7 @@ public class WeakQueryFutureIterator extends GridCloseableIteratorAdapter { if (cur == null) throw new IllegalStateException(); - CacheWeakQueryIteratorsHolder.this.remove(cur); + convert.remove(cur); cur = null; } @@ -206,7 +198,7 @@ public class WeakQueryFutureIterator extends GridCloseableIteratorAdapter { /** * @return Iterator weak reference. */ - private WeakReference weakReference() { + private WeakReference> weakReference() { return weakRef; } @@ -226,7 +218,7 @@ private void init() throws IgniteCheckedException { if (!init) { V futNext = fut.next(); - next = futNext != null ? convert(futNext) : null; + next = futNext != null ? convert.convert(futNext) : null; init = true; } diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java index ad65344543054..cde739b525bf0 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java @@ -186,6 +186,9 @@ public class GridCacheContext implements Externalizable { /** Default expiry policy. */ private ExpiryPolicy expiryPlc; + /** Cache weak query iterator holder. */ + private CacheWeakQueryIteratorsHolder> itHolder; + /** * Empty constructor required for {@link Externalizable}. */ @@ -300,6 +303,8 @@ public GridCacheContext( if (expiryPlc instanceof EternalExpiryPolicy) expiryPlc = null; + + itHolder = new CacheWeakQueryIteratorsHolder(log); } /** @@ -836,6 +841,13 @@ public GridCacheContinuousQueryManager continuousQueries() { return contQryMgr; } + /** + * @return Iterators Holder. + */ + public CacheWeakQueryIteratorsHolder> itHolder() { + return itHolder; + } + /** * @return Swap manager. */ diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSetImpl.java index cacc9334d8bb4..852aa217ceab8 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSetImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSetImpl.java @@ -66,9 +66,6 @@ public class GridCacheSetImpl extends AbstractCollection implements GridCa /** Removed flag. */ private volatile boolean rmvd; - /** Iterator holder. */ - private final CacheWeakQueryIteratorsHolder> itHolder; - /** * @param ctx Cache context. * @param name Set name. @@ -84,16 +81,6 @@ public GridCacheSetImpl(GridCacheContext ctx, String name, GridCacheSetHeader hd cache = ctx.cache(); hdrPart = ctx.affinity().partition(new GridCacheSetHeaderKey(name)); - - itHolder = new CacheWeakQueryIteratorsHolder>(ctx.logger(GridCacheSetImpl.class)) { - @Override protected T convert(Map.Entry e) { - return e.getKey(); - } - - @Override protected void remove(T item) { - GridCacheSetImpl.this.remove(item); - } - }; } /** {@inheritDoc} */ @@ -348,10 +335,21 @@ private GridCloseableIterator iterator0() { qry.projection(ctx.grid().forNodes(nodes)); - CacheWeakQueryIteratorsHolder.WeakQueryFutureIterator it = itHolder.iterator(qry.execute()); + GridCacheQueryFuture> fut = qry.execute(); + + CacheWeakQueryIteratorsHolder.WeakQueryFutureIterator it = + ctx.itHolder().iterator(fut, new CacheIteratorConverter>() { + @Override protected T convert(Map.Entry e) { + return e.getKey(); + } + + @Override protected void remove(T item) { + GridCacheSetImpl.this.remove(item); + } + }); if (rmvd) { - itHolder.removeIterator(it); + ctx.itHolder().removeIterator(it); checkRemoved(); } @@ -438,7 +436,7 @@ void removed(boolean rmvd) { this.rmvd = rmvd; if (rmvd) - itHolder.clearQueries(); + ctx.itHolder().clearQueries(); } /** @@ -453,7 +451,7 @@ private void checkRemoved() { * Checks if set was removed and handles iterators weak reference queue. */ private void onAccess() { - itHolder.checkWeakQueue(); + ctx.itHolder().checkWeakQueue(); checkRemoved(); }