diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheProjection.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheProjection.java index 24fd33f6f2fee..d4a13ac4d37d7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheProjection.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheProjection.java @@ -380,50 +380,6 @@ public interface CacheProjection extends Iterable> { */ public IgniteInternalFuture reloadAsync(K key); - /** - * Reloads all currently cached keys form persistent storage. - *

Transactions

- * This method does not participate in transactions, however it does not violate - * cache integrity and can be used safely with or without transactions. - * - * @throws IgniteCheckedException If reloading failed. - */ - public void reloadAll() throws IgniteCheckedException; - - /** - * Asynchronously reloads all specified entries from underlying - * persistent storage. - *

Transactions

- * This method does not participate in transactions, however it does not violate - * cache integrity and can be used safely with or without transactions. - * - * @return Future which will complete whenever {@code reload} completes. - */ - public IgniteInternalFuture reloadAllAsync(); - - /** - * Reloads specified entries from underlying persistent storage. - *

Transactions

- * This method does not participate in transactions, however it does not violate - * cache integrity and can be used safely with or without transactions. - * - * @param keys Keys to reload. - * @throws IgniteCheckedException if reloading failed. - */ - public void reloadAll(@Nullable Collection keys) throws IgniteCheckedException; - - /** - * Asynchronously reloads all specified entries from underlying - * persistent storage. - *

Transactions

- * This method does not participate in transactions, however it does not violate - * cache integrity and can be used safely with or without transactions. - * - * @param keys Keys to reload. - * @return Future which will complete whenever {@code reload} completes. - */ - public IgniteInternalFuture reloadAllAsync(@Nullable Collection keys); - /** * Peeks at in-memory cached value using default {@link GridCachePeekMode#SMART} * peek mode. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index f598b3522bc28..ef7ec5187a13e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -1022,7 +1022,7 @@ else if (m == DB) { } if (val != null) - return F.t((V)val.get().value(ctx.cacheObjectContext(), true)); + return F.t(CU.value(val.get(), ctx, true)); } } catch (GridCacheEntryRemovedException ignore) { @@ -1567,25 +1567,6 @@ public V applyx(IgniteInternalFuture> e) throws IgniteCheckedException return getAllAsync(keys, !ctx.config().isReadFromBackup(), /*skip tx*/true, null, null, taskName, true, false); } - /** {@inheritDoc} */ - @Override public void reloadAll(@Nullable Collection keys) throws IgniteCheckedException { - reloadAll(keys, false, false); - } - - /** {@inheritDoc} */ - @Override public void reloadAll() throws IgniteCheckedException { - ctx.denyOnFlags(F.asList(LOCAL, READ)); - - reloadAll(keySet()); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture reloadAllAsync() { - ctx.denyOnFlags(F.asList(LOCAL, READ)); - - return reloadAllAsync(keySet()); - } - /** * @param keys Keys. * @param reload Reload flag. @@ -1616,190 +1597,6 @@ public IgniteInternalFuture readThroughAllAsync(final Collection reloadAll(@Nullable Collection keys, boolean ret, boolean skipVals) - throws IgniteCheckedException { - UUID subjId = ctx.subjectIdPerCall(null); - - String taskName = ctx.kernalContext().job().currentTaskName(); - - return reloadAllAsync(keys, ret, skipVals, subjId, taskName).get(); - } - - /** - * @param keys Keys. - * @param ret Return flag. - * @return Future. - */ - public IgniteInternalFuture> reloadAllAsync(@Nullable Collection keys, - boolean ret, - boolean skipVals, - @Nullable UUID subjId, - String taskName) - { - ctx.denyOnFlag(READ); - - final long topVer = ctx.affinity().affinityTopologyVersion(); - - // TODO IGNITE-51. - List cacheKeys = new ArrayList<>(keys.size()); - - if (!F.isEmpty(keys)) { - final String uid = CU.uuid(); // Get meta UUID for this thread. - - assert keys != null; - - if (keyCheck) - validateCacheKeys(keys); - - for (K key : keys) { - if (key == null) - continue; - - KeyCacheObject cacheKey = ctx.toCacheKeyObject(key); - - cacheKeys.add(cacheKey); - - // Skip primary or backup entries for near cache. - if (ctx.isNear() && ctx.affinity().localNode(cacheKey, topVer)) - continue; - - while (true) { - try { - GridCacheEntryEx entry = entryExSafe(cacheKey, topVer); - - if (entry == null) - break; - - // Get version before checking filer. - GridCacheVersion ver = entry.version(); - - // Tag entry with current version. - entry.addMeta(uid, ver); - - break; - } - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Got removed entry for reload (will retry): " + key); - } - catch (GridDhtInvalidPartitionException ignore) { - if (log.isDebugEnabled()) - log.debug("Got invalid partition for key (will skip): " + key); - - break; - } - } - } - - final Map map = ret ? new HashMap(keys.size(), 1.0f) : null; - - final Collection absentKeys = F.view(cacheKeys, CU.keyHasMeta(ctx, uid)); - - final Collection loadedKeys = new GridConcurrentHashSet<>(); - - IgniteInternalFuture readFut = - readThroughAllAsync(absentKeys, true, skipVals, null, subjId, taskName, new CI2() { - /** Version for all loaded entries. */ - private GridCacheVersion nextVer = ctx.versions().next(); - - /** {@inheritDoc} */ - @Override public void apply(KeyCacheObject key, Object val) { - loadedKeys.add(key); - - GridCacheEntryEx entry = peekEx(key); - - if (entry != null) { - try { - GridCacheVersion curVer = entry.removeMeta(uid); - - // If entry passed the filter. - if (curVer != null) { - boolean wasNew = entry.isNewLocked(); - - entry.unswap(); - - CacheObject cacheVal = ctx.toCacheObject(val); - - boolean set = entry.versionedValue(cacheVal, curVer, nextVer); - - ctx.evicts().touch(entry, topVer); - - if (map != null) { - if (set || wasNew) - map.put(key.value(ctx.cacheObjectContext(), false), (V)val); - else { - try { - // TODO IGNITE-51. - K k = key.value(ctx.cacheObjectContext(), false); - - GridTuple v = peek0(false, k, GLOBAL); - - if (v != null) - map.put(k, v.get()); - } - catch (GridCacheFilterFailedException ex) { - ex.printStackTrace(); - - assert false; - } - } - } - - if (log.isDebugEnabled()) { - log.debug("Set value loaded from store into entry [set=" + set + ", " + - "curVer=" + - curVer + ", newVer=" + nextVer + ", entry=" + entry + ']'); - } - } - else { - if (log.isDebugEnabled()) { - log.debug("Current version was not found (either entry was removed or " + - "validation was not passed: " + entry); - } - } - } - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) { - log.debug("Got removed entry for reload (will not store reloaded entry) " + - "[entry=" + entry + ']'); - } - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } - } - } - }); - - return readFut.chain(new CX1, Map>() { - @Override public Map applyx(IgniteInternalFuture e) throws IgniteCheckedException { - // Touch all not loaded keys. - for (KeyCacheObject key : absentKeys) { - if (!loadedKeys.contains(key)) { - GridCacheEntryEx entry = peekEx(key); - - if (entry != null) - ctx.evicts().touch(entry, topVer); - } - } - - // Make sure there were no exceptions. - e.get(); - - return map; - } - }); - } - - return new GridFinishedFuture<>(ctx.kernalContext(), Collections.emptyMap()); - } - /** * @param keys Keys. * @param ret Return flag. @@ -5207,20 +5004,6 @@ public Map getAll(Collection keys, boolean deserializePortabl } } - /** - * @param keys Keys. - * @return Reload future. - */ - @Override public IgniteInternalFuture reloadAllAsync(@Nullable Collection keys) { - UUID subjId = ctx.subjectIdPerCall(null); - - String taskName = ctx.kernalContext().job().currentTaskName(); - - - - return reloadAllAsync(keys, false, false, subjId, taskName); - } - /** * @param key Key. * @return Reload future. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java index 86ac962649030..342402c57db9e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java @@ -479,26 +479,6 @@ boolean isAll(K k, V v) { return cache.reloadAsync(key); } - /** {@inheritDoc} */ - @Override public void reloadAll() throws IgniteCheckedException { - cache.reloadAll(); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture reloadAllAsync() { - return cache.reloadAllAsync(); - } - - /** {@inheritDoc} */ - @Override public void reloadAll(@Nullable Collection keys) throws IgniteCheckedException { - cache.reloadAll(keys); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture reloadAllAsync(@Nullable Collection keys) { - return cache.reloadAllAsync(keys); - } - /** {@inheritDoc} */ @Override public V get(K key) throws IgniteCheckedException { return cache.get(key, deserializePortables()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java index 88af3ddedee0b..a922d59b9892d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java @@ -413,54 +413,6 @@ public GridCacheProjectionImpl gateProjection() { } } - /** {@inheritDoc} */ - @Override public void reloadAll() throws IgniteCheckedException { - GridCacheProjectionImpl prev = gate.enter(prj); - - try { - delegate.reloadAll(); - } - finally { - gate.leave(prev); - } - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture reloadAllAsync() { - GridCacheProjectionImpl prev = gate.enter(prj); - - try { - return delegate.reloadAllAsync(); - } - finally { - gate.leave(prev); - } - } - - /** {@inheritDoc} */ - @Override public void reloadAll(@Nullable Collection keys) throws IgniteCheckedException { - GridCacheProjectionImpl prev = gate.enter(prj); - - try { - delegate.reloadAll(keys); - } - finally { - gate.leave(prev); - } - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture reloadAllAsync(@Nullable Collection keys) { - GridCacheProjectionImpl prev = gate.enter(prj); - - try { - return delegate.reloadAllAsync(keys); - } - finally { - gate.leave(prev); - } - } - /** {@inheritDoc} */ @Nullable @Override public V get(K key) throws IgniteCheckedException { GridCacheProjectionImpl prev = gate.enter(prj); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java index 71dd83c984103..ce8c99585f7a2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java @@ -203,27 +203,6 @@ public boolean isAllLockedNearOnly(Iterable keys) { skipVals); } - /** {@inheritDoc} */ - @Override public void reloadAll(@Nullable Collection keys) throws IgniteCheckedException { - dht().reloadAll(keys); - - super.reloadAll(keys); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public IgniteInternalFuture reloadAllAsync(@Nullable Collection keys) { - GridCompoundFuture fut = new GridCompoundFuture(ctx.kernalContext()); - - fut.add(super.reloadAllAsync(keys)); - fut.add(dht().reloadAllAsync(keys)); - - fut.markInitialized(); - - return fut; - - } - /** {@inheritDoc} */ @Override public V reload(K key) throws IgniteCheckedException { @@ -241,26 +220,6 @@ public boolean isAllLockedNearOnly(Iterable keys) { return val == null ? nearVal : val; } - /** {@inheritDoc} */ - @Override public void reloadAll() throws IgniteCheckedException { - super.reloadAll(); - - dht().reloadAll(); - } - - /** {@inheritDoc} */ - @SuppressWarnings({"unchecked"}) - @Override public IgniteInternalFuture reloadAllAsync() { - GridCompoundFuture fut = new GridCompoundFuture(ctx.kernalContext()); - - fut.add(super.reloadAllAsync()); - fut.add(dht().reloadAllAsync()); - - fut.markInitialized(); - - return fut; - } - /** * @param tx Transaction. * @param keys Keys to load. diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionedReloadAllAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionedReloadAllAbstractSelfTest.java index 37b5e3ad0b568..bce307251f1d4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionedReloadAllAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionedReloadAllAbstractSelfTest.java @@ -19,6 +19,7 @@ import org.apache.ignite.*; import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; import org.apache.ignite.cache.store.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; @@ -32,6 +33,7 @@ import org.jdk8.backport.*; import javax.cache.configuration.*; +import javax.cache.integration.*; import java.util.*; import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.*; @@ -56,7 +58,7 @@ public abstract class GridCachePartitionedReloadAllAbstractSelfTest extends Grid private final Map map = new ConcurrentHashMap8<>(); /** Collection of caches, one per grid node. */ - private List> caches; + private List> caches; /** {@inheritDoc} */ @SuppressWarnings("unchecked") @@ -130,7 +132,7 @@ protected CacheAtomicWriteOrderMode atomicWriteOrderMode() { caches = new ArrayList<>(GRID_CNT); for (int i = 0; i < GRID_CNT; i++) - caches.add(((IgniteKernal)startGrid(i)).cache(null)); + caches.add(startGrid(i).jcache(null)); awaitPartitionMapExchange(); } @@ -180,15 +182,12 @@ protected CacheAtomicWriteOrderMode atomicWriteOrderMode() { } /** - * Ensure that reloadAll() with disabled near cache reloads data only on a node - * on which reloadAll() has been called. - * * @throws Exception If test failed. */ public void testReloadAll() throws Exception { // Fill caches with values. - for (GridCache cache : caches) { - Iterable keys = primaryKeysForCache(cache, 100); + for (IgniteCache cache : caches) { + Iterable keys = primaryKeys(cache, 100); info("Values [cache=" + caches.indexOf(cache) + ", size=" + F.size(keys.iterator()) + ", keys=" + keys + "]"); @@ -196,53 +195,22 @@ public void testReloadAll() throws Exception { map.put(key, "val" + key); } - Collection> emptyCaches = new ArrayList<>(caches); + CompletionListenerFuture fut = new CompletionListenerFuture(); - for (GridCache cache : caches) { - info("Reloading cache: " + caches.indexOf(cache)); + caches.get(0).loadAll(map.keySet(), false, fut); - // Check data is reloaded only on the nodes on which reloadAll() has been called. - if (!nearEnabled()) { - for (GridCache eCache : emptyCaches) - assertEquals("Non-null values found in cache [cache=" + caches.indexOf(eCache) + - ", size=" + eCache.size() + ", size=" + eCache.size() + - ", entrySetSize=" + eCache.entrySet().size() + "]", - 0, eCache.size()); - } + fut.get(); - cache.reloadAll(map.keySet()); + CacheAffinity aff = ignite(0).affinity(null); + for (IgniteCache cache : caches) { for (Integer key : map.keySet()) { - if (cache.affinity().isPrimaryOrBackup(grid(caches.indexOf(cache)).localNode(), key) || + if (aff.isPrimaryOrBackup(grid(caches.indexOf(cache)).localNode(), key) || nearEnabled()) - assertEquals(map.get(key), cache.peek(key)); + assertEquals(map.get(key), cache.localPeek(key)); else - assertNull(cache.peek(key)); + assertNull(cache.localPeek(key)); } - - emptyCaches.remove(cache); } } - - /** - * Create list of keys for which the given cache is primary. - * - * @param cache Cache. - * @param cnt Keys count. - * @return Collection of keys for which given cache is primary. - */ - private Iterable primaryKeysForCache(GridCache cache, int cnt) { - Collection found = new ArrayList<>(cnt); - - for (int i = 0; i < 10000; i++) { - if (cache.affinity().isPrimary(grid(caches.indexOf(cache)).localNode(), i)) { - found.add(i); - - if (found.size() == cnt) - return found; - } - } - - throw new IllegalStateException("Unable to find " + cnt + " keys as primary for cache."); - } } diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryLoadSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryLoadSelfTest.java index 7cd978566f0e4..78115e3001759 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryLoadSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryLoadSelfTest.java @@ -33,8 +33,8 @@ import org.apache.ignite.testframework.junits.common.*; import org.jetbrains.annotations.*; -import javax.cache.*; import javax.cache.configuration.*; +import javax.cache.integration.*; import java.io.*; import java.util.*; @@ -254,48 +254,11 @@ public void testReloadAll() throws Exception { for (int i = 0; i < PUT_CNT - 5; i++) keys[i] = i + 5; - cache.reloadAll(F.asList(keys)); + CompletionListenerFuture fut = new CompletionListenerFuture(); - assert cache.size() == PUT_CNT - 5; - - Collection> res = - cache.queries().createSqlQuery(ValueObject.class, "val >= 0").execute().get(); - - assert res != null; - assert res.size() == PUT_CNT - 5; - assert size(ValueObject.class) == PUT_CNT - 5; - - cache.clear(); - - assert cache.isEmpty(); - assertEquals(0, cache.size()); - - cache.reloadAll(Arrays.asList(keys)); - - assertEquals(PUT_CNT - 5, cache.size()); - - res = cache.queries().createSqlQuery(ValueObject.class, "val >= 0").execute().get(); - - assert res != null; - assert res.size() == PUT_CNT - 5; - assert size(ValueObject.class) == PUT_CNT - 5; - } + grid().jcache(null).loadAll(F.asSet(keys), true, fut); - /** - * @throws Exception If failed. - */ - public void testReloadAllAsync() throws Exception { - for (int i = 0; i < PUT_CNT; i++) - STORE_MAP.put(i, new ValueObject(i)); - - GridCache cache = cache(); - - Integer[] keys = new Integer[PUT_CNT - 5]; - - for (int i = 0; i < PUT_CNT - 5; i++) - keys[i] = i + 5; - - cache.reloadAllAsync(F.asList(keys)).get(); + fut.get(); assert cache.size() == PUT_CNT - 5; @@ -311,7 +274,11 @@ public void testReloadAllAsync() throws Exception { assert cache.isEmpty(); assertEquals(0, cache.size()); - cache.reloadAllAsync(Arrays.asList(keys)).get(); + fut = new CompletionListenerFuture(); + + grid().jcache(null).loadAll(F.asSet(keys), true, fut); + + fut.get(); assertEquals(PUT_CNT - 5, cache.size());