From 854664a751857263af4b307442c24720687b8789 Mon Sep 17 00:00:00 2001 From: sboikov Date: Tue, 20 Jan 2015 18:34:50 +0300 Subject: [PATCH] # ignite-42 --- .../processors/cache/GridCacheAdapter.java | 258 ++++++++++++------ .../cache/GridCacheStoreManager.java | 2 +- .../distributed/dht/GridDhtCacheAdapter.java | 109 +++++--- .../near/GridNearCacheAdapter.java | 5 + .../cache/IgniteCacheAbstractTest.java | 6 + .../IgniteCacheAtomicLoadAllTest.java | 55 ++++ .../IgniteCacheAtomicLoaderWriterTest.java | 2 +- .../IgniteCacheAtomicLocalLoadAllTest.java | 49 ++++ .../IgniteCacheLoadAllAbstractTest.java | 190 +++++++++++-- .../IgniteCacheLoaderWriterAbstractTest.java | 219 ++++++++++++++- .../integration/IgniteCacheTxLoadAllTest.java | 50 ++++ .../IgniteCacheTxLoaderWriterTest.java | 49 ++++ .../IgniteCacheTxLocalLoadAllTest.java | 49 ++++ ...abledTxOriginatingNodeFailureSelfTest.java | 1 - .../bamboo/GridDataGridTestSuite.java | 9 + 15 files changed, 894 insertions(+), 159 deletions(-) create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheAtomicLoadAllTest.java create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheAtomicLocalLoadAllTest.java create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheTxLoadAllTest.java create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheTxLoaderWriterTest.java create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheTxLocalLoadAllTest.java diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java index 5a3503351fbf8..e08653e3a1f42 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java @@ -1379,11 +1379,10 @@ private boolean evictx(K key, GridCacheVersion ver, return getAllAsync(Collections.singletonList(key), /*force primary*/true, /*skip tx*/false, null, null, taskName, true).chain(new CX1>, V>() { - @Override - public V applyx(IgniteFuture> e) throws IgniteCheckedException { - return e.get().get(key); - } - }); + @Override public V applyx(IgniteFuture> e) throws IgniteCheckedException { + return e.get().get(key); + } + }); } /** {@inheritDoc} */ @@ -3385,7 +3384,7 @@ public IgniteFuture loadAll(final Set keys, if (keys.size() < 10) { for (K key : keys) { if (key == null) - throw new NullPointerException(); + throw new NullPointerException("Key to load is null."); } } @@ -3394,14 +3393,17 @@ public IgniteFuture loadAll(final Set keys, if (replaceExisting) { if (ctx.store().isLocalStore()) { - assert false; + Collection nodes = ctx.grid().forCache(name()).nodes(); - return null; + return ctx.closures().callAsyncNoFailover(BROADCAST, + new LoadKeysCallable<>(ctx.name(), keys, true), + nodes, + true); } else { return ctx.closures().callLocalSafe(new Callable() { @Override public Void call() throws Exception { - loadAll(keys); + localLoadAndUpdate(keys); return null; } @@ -3409,62 +3411,12 @@ public IgniteFuture loadAll(final Set keys, } } else { - return ctx.closures().callLocalSafe(new Callable() { - @Override public Void call() throws Exception { - // Version for all loaded entries. - final GridCacheVersion ver0 = ctx.versions().nextForLoad(); - final boolean replicate = ctx.isDrEnabled(); - final long topVer = ctx.affinity().affinityTopologyVersion(); - - ctx.store().loadAllFromStore(null, keys, new CIX2() { - @Override public void applyx(K key, V val) - throws PortableException { - if (ctx.portableEnabled()) { - key = (K)ctx.marshalToPortable(key); - val = (V)ctx.marshalToPortable(val); - } + Collection nodes = ctx.grid().forCache(name()).nodes(); - GridCacheEntryEx entry = entryEx(key, false); - - try { - entry.initialValue(val, null, ver0, 0, -1, false, topVer, replicate ? DR_LOAD : DR_NONE); - } - catch (IgniteCheckedException e) { - throw new IgniteException("Failed to put cache value: " + entry, e); - } - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Got removed entry during loadCache (will ignore): " + entry); - } - finally { - ctx.evicts().touch(entry, topVer); - } - - CU.unwindEvicts(ctx); - } - }); - - return null; - } - }); - } - } - - /** - * @param keys Keys. - * @throws IgniteCheckedException If failed. - */ - private void loadAllLocalStore(final Set keys) throws IgniteCheckedException { - assert ctx.store().isLocalStore(); - - try (final IgniteDataLoader ldr = ctx.kernalContext().dataLoad().dataLoader(ctx.namex(), false)) { - ldr.updater(new GridDrDataLoadCacheUpdater()); - - LocalStoreLoadClosure c = new LocalStoreLoadClosure(null, ldr, 0); - - ctx.store().loadAllFromLocalStore(null, keys, c); - - c.onDone(); + return ctx.closures().callAsyncNoFailover(BROADCAST, + new LoadKeysCallable<>(ctx.name(), keys, false), + nodes, + true); } } @@ -3472,7 +3424,7 @@ private void loadAllLocalStore(final Set keys) throws IgniteChecked * @param keys Keys. * @throws IgniteCheckedException If failed. */ - private void loadAll(final Set keys) throws IgniteCheckedException { + private void localLoadAndUpdate(final Collection keys) throws IgniteCheckedException { try (final IgniteDataLoader ldr = ctx.kernalContext().dataLoad().dataLoader(ctx.namex(), false)) { final Collection> col = new ArrayList<>(ldr.perNodeBufferSize()); @@ -3498,6 +3450,37 @@ private void loadAll(final Set keys) throws IgniteCheckedException } } + /** + * @param keys Keys to load. + * @throws IgniteCheckedException If failed. + */ + public void localLoad(Collection keys) throws IgniteCheckedException { + final boolean replicate = ctx.isDrEnabled(); + final long topVer = ctx.affinity().affinityTopologyVersion(); + + if (ctx.store().isLocalStore()) { + try (final IgniteDataLoader ldr = ctx.kernalContext().dataLoad().dataLoader(ctx.namex(), false)) { + ldr.updater(new GridDrDataLoadCacheUpdater()); + + LocalStoreLoadClosure c = new LocalStoreLoadClosure(null, ldr, 0); + + ctx.store().localStoreLoadAll(null, keys, c); + + c.onDone(); + } + } + else { + // Version for all loaded entries. + final GridCacheVersion ver0 = ctx.versions().nextForLoad(); + + ctx.store().loadAllFromStore(null, keys, new CI2() { + @Override public void apply(K key, V val) { + loadEntry(key, val, ver0, null, topVer, replicate, 0); + } + }); + } + } + /** {@inheritDoc} */ @Override public void loadCache(final IgniteBiPredicate p, final long ttl, Object[] args) throws IgniteCheckedException { @@ -3520,38 +3503,58 @@ private void loadAll(final Set keys) throws IgniteCheckedException final GridCacheVersion ver0 = ctx.versions().nextForLoad(); ctx.store().loadCache(new CIX3() { - @Override public void applyx(K key, V val, @Nullable GridCacheVersion ver) + @Override + public void applyx(K key, V val, @Nullable GridCacheVersion ver) throws PortableException { assert ver == null; - if (p != null && !p.apply(key, val)) - return; + loadEntry(key, val, ver0, p, topVer, replicate, ttl); + } + }, args); + } + } - if (ctx.portableEnabled()) { - key = (K)ctx.marshalToPortable(key); - val = (V)ctx.marshalToPortable(val); - } + /** + * @param key Key. + * @param val Value. + * @param ver Cache version. + * @param p Optional predicate. + * @param topVer Topology version. + * @param replicate Replication flag. + * @param ttl TTL. + */ + private void loadEntry(K key, + V val, + GridCacheVersion ver, + @Nullable IgniteBiPredicate p, + long topVer, + boolean replicate, + long ttl) { + if (p != null && !p.apply(key, val)) + return; - GridCacheEntryEx entry = entryEx(key, false); + if (ctx.portableEnabled()) { + key = (K)ctx.marshalToPortable(key); + val = (V)ctx.marshalToPortable(val); + } - try { - entry.initialValue(val, null, ver0, ttl, -1, false, topVer, replicate ? DR_LOAD : DR_NONE); - } - catch (IgniteCheckedException e) { - throw new IgniteException("Failed to put cache value: " + entry, e); - } - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Got removed entry during loadCache (will ignore): " + entry); - } - finally { - ctx.evicts().touch(entry, topVer); - } + GridCacheEntryEx entry = entryEx(key, false); - CU.unwindEvicts(ctx); - } - }, args); + try { + entry.initialValue(val, null, ver, ttl, -1, false, topVer, replicate ? DR_LOAD : DR_NONE); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to put cache value: " + entry, e); + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed entry during loadCache (will ignore): " + entry); + } + finally { + ctx.evicts().touch(entry, topVer); } + + CU.unwindEvicts(ctx); } /** {@inheritDoc} */ @@ -5191,6 +5194,83 @@ public GetExpiryPolicy(long accessTtl) { } } + /** + * + */ + static class LoadKeysCallable implements IgniteCallable, Externalizable{ + /** */ + private static final long serialVersionUID = 0L; + + /** Cache name. */ + private String cacheName; + + /** Injected grid instance. */ + @IgniteInstanceResource + private Ignite ignite; + + /** Keys to load. */ + private Collection keys; + + /** Update flag. */ + private boolean update; + + /** + * Required by {@link Externalizable}. + */ + public LoadKeysCallable() { + // No-op. + } + + /** + * @param cacheName Cache name. + * @param keys Keys. + * @param update If {@code true} calls {@link #localLoadAndUpdate(Collection)} + * otherwise {@link #localLoad(Collection)}. + */ + LoadKeysCallable(String cacheName, Collection keys, boolean update) { + this.cacheName = cacheName; + this.keys = keys; + this.update = update; + } + + /** {@inheritDoc} */ + @Override public Void call() throws Exception { + GridCacheAdapter cache = ((GridKernal)ignite).context().cache().internalCache(cacheName); + + cache.context().gate().enter(); + + try { + if (update) + cache.localLoadAndUpdate(keys); + else + cache.localLoad(keys); + } + finally { + cache.context().gate().leave(); + } + + return null; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeString(out, cacheName); + + U.writeCollection(out, keys); + + out.writeBoolean(update); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + cacheName = U.readString(in); + + keys = U.readCollection(in); + + update = in.readBoolean(); + } + } + /** * */ diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java index 11e11a3d07549..aeb9110f58203 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java @@ -294,7 +294,7 @@ public boolean writeToStoreFromDht() { * @param vis Closer to cache loaded elements. * @throws IgniteCheckedException If data loading failed. */ - public void loadAllFromLocalStore(@Nullable IgniteTx tx, + public void localStoreLoadAll(@Nullable IgniteTx tx, Collection keys, final GridInClosure3 vis) throws IgniteCheckedException { assert store != null; diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 22e1563fe68f0..f9a0f82555def 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -355,6 +355,28 @@ public GridCacheEntryEx entryExx(K key, long topVer, boolean allowDetached } } + /** {@inheritDoc} */ + @Override public void localLoad(Collection keys) throws IgniteCheckedException { + if (ctx.store().isLocalStore()) { + super.localLoad(keys); + + return; + } + + // Version for all loaded entries. + final GridCacheVersion ver0 = ctx.shared().versions().nextForLoad(topology().topologyVersion()); + + final boolean replicate = ctx.isDrEnabled(); + + final long topVer = ctx.affinity().affinityTopologyVersion(); + + ctx.store().loadAllFromStore(null, keys, new CI2() { + @Override public void apply(K key, V val) { + loadEntry(key, val, ver0, null, topVer, replicate, 0); + } + }); + } + /** {@inheritDoc} */ @Override public void loadCache(final IgniteBiPredicate p, final long ttl, Object[] args) throws IgniteCheckedException { if (ctx.store().isLocalStore()) { @@ -374,50 +396,69 @@ public GridCacheEntryEx entryExx(K key, long topVer, boolean allowDetached @Override public void apply(K key, V val, @Nullable GridCacheVersion ver) { assert ver == null; - if (p != null && !p.apply(key, val)) - return; + loadEntry(key, val, ver0, p, topVer, replicate, ttl); + } + }, args); + } - try { - GridDhtLocalPartition part = top.localPartition(ctx.affinity().partition(key), -1, true); + /** + * @param key Key. + * @param val Value. + * @param ver Cache version. + * @param p Optional predicate. + * @param topVer Topology version. + * @param replicate Replication flag. + * @param ttl TTL. + */ + private void loadEntry(K key, + V val, + GridCacheVersion ver, + @Nullable IgniteBiPredicate p, + long topVer, + boolean replicate, + long ttl) { + if (p != null && !p.apply(key, val)) + return; - // Reserve to make sure that partition does not get unloaded. - if (part.reserve()) { - GridCacheEntryEx entry = null; + try { + GridDhtLocalPartition part = top.localPartition(ctx.affinity().partition(key), -1, true); - try { - if (ctx.portableEnabled()) { - key = (K)ctx.marshalToPortable(key); - val = (V)ctx.marshalToPortable(val); - } + // Reserve to make sure that partition does not get unloaded. + if (part.reserve()) { + GridCacheEntryEx entry = null; - entry = entryEx(key, false); + try { + if (ctx.portableEnabled()) { + key = (K)ctx.marshalToPortable(key); + val = (V)ctx.marshalToPortable(val); + } - entry.initialValue(val, null, ver0, ttl, -1, false, topVer, replicate ? DR_LOAD : DR_NONE); - } - catch (IgniteCheckedException e) { - throw new IgniteException("Failed to put cache value: " + entry, e); - } - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Got removed entry during loadCache (will ignore): " + entry); - } - finally { - if (entry != null) - entry.context().evicts().touch(entry, topVer); + entry = entryEx(key, false); - part.release(); - } - } - else if (log.isDebugEnabled()) - log.debug("Will node load entry into cache (partition is invalid): " + part); + entry.initialValue(val, null, ver, ttl, -1, false, topVer, replicate ? DR_LOAD : DR_NONE); } - catch (GridDhtInvalidPartitionException e) { + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to put cache value: " + entry, e); + } + catch (GridCacheEntryRemovedException ignore) { if (log.isDebugEnabled()) - log.debug("Ignoring entry for partition that does not belong [key=" + key + ", val=" + val + - ", err=" + e + ']'); + log.debug("Got removed entry during loadCache (will ignore): " + entry); + } + finally { + if (entry != null) + entry.context().evicts().touch(entry, topVer); + + part.release(); } } - }, args); + else if (log.isDebugEnabled()) + log.debug("Will node load entry into cache (partition is invalid): " + part); + } + catch (GridDhtInvalidPartitionException e) { + if (log.isDebugEnabled()) + log.debug("Ignoring entry for partition that does not belong [key=" + key + ", val=" + val + + ", err=" + e + ']'); + } } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java index efcd6dd458cc9..6617dc6854cb9 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java @@ -311,6 +311,11 @@ public IgniteFuture> loadAsync(@Nullable IgniteTxEx tx, dht().loadCache(p, ttl, args); } + /** {@inheritDoc} */ + @Override public void localLoad(Collection keys) throws IgniteCheckedException { + dht().localLoad(keys); + } + /** {@inheritDoc} */ @Override public IgniteFuture loadCacheAsync(IgniteBiPredicate p, long ttl, Object[] args) { return dht().loadCacheAsync(p, ttl, args); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java index b6bf1e3aa9b36..bf8206cf2968a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java @@ -118,8 +118,14 @@ protected CacheConfiguration cacheConfiguration(String gridName) throws Exceptio cfg.setCacheLoaderFactory(loaderFactory()); + if (cfg.getCacheLoaderFactory() != null) + cfg.setReadThrough(true); + cfg.setCacheWriterFactory(writerFactory()); + if (cfg.getCacheWriterFactory() != null) + cfg.setWriteThrough(true); + CacheStore store = cacheStore(); if (store != null) { diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheAtomicLoadAllTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheAtomicLoadAllTest.java new file mode 100644 index 0000000000000..dea9bfbc4d90a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheAtomicLoadAllTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.integration; + +import org.gridgain.grid.cache.*; + +import static org.gridgain.grid.cache.GridCacheAtomicWriteOrderMode.*; +import static org.gridgain.grid.cache.GridCacheAtomicityMode.*; +import static org.gridgain.grid.cache.GridCacheDistributionMode.*; +import static org.gridgain.grid.cache.GridCacheMode.*; + +/** + * + */ +public class IgniteCacheAtomicLoadAllTest extends IgniteCacheLoadAllAbstractTest { + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 3; + } + + /** {@inheritDoc} */ + @Override protected GridCacheMode cacheMode() { + return PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected GridCacheAtomicityMode atomicityMode() { + return ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected GridCacheDistributionMode distributionMode() { + return PARTITIONED_ONLY; + } + + /** {@inheritDoc} */ + @Override protected GridCacheAtomicWriteOrderMode atomicWriteOrderMode() { + return PRIMARY; + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheAtomicLoaderWriterTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheAtomicLoaderWriterTest.java index a63133af1cc80..c0a9bb7555f15 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheAtomicLoaderWriterTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheAtomicLoaderWriterTest.java @@ -45,7 +45,7 @@ public class IgniteCacheAtomicLoaderWriterTest extends IgniteCacheLoaderWriterAb /** {@inheritDoc} */ @Override protected GridCacheAtomicWriteOrderMode atomicWriteOrderMode() { - return CLOCK; + return PRIMARY; } /** {@inheritDoc} */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheAtomicLocalLoadAllTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheAtomicLocalLoadAllTest.java new file mode 100644 index 0000000000000..26075e68903e8 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheAtomicLocalLoadAllTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.integration; + +import org.gridgain.grid.cache.*; + +import static org.gridgain.grid.cache.GridCacheAtomicityMode.*; +import static org.gridgain.grid.cache.GridCacheDistributionMode.*; +import static org.gridgain.grid.cache.GridCacheMode.*; + +/** + * + */ +public class IgniteCacheAtomicLocalLoadAllTest extends IgniteCacheLoadAllAbstractTest { + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override protected GridCacheMode cacheMode() { + return LOCAL; + } + + /** {@inheritDoc} */ + @Override protected GridCacheAtomicityMode atomicityMode() { + return ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected GridCacheDistributionMode distributionMode() { + return PARTITIONED_ONLY; + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoadAllAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoadAllAbstractTest.java index 6ee5708534e09..75c0a2742eed7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoadAllAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoadAllAbstractTest.java @@ -18,68 +18,218 @@ package org.apache.ignite.internal.processors.cache.integration; import org.apache.ignite.*; +import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.processors.cache.*; import org.gridgain.grid.cache.affinity.*; +import org.jdk8.backport.*; +import javax.cache.Cache; +import javax.cache.configuration.*; import javax.cache.integration.*; import java.util.*; /** - * Test for {@link javax.cache.Cache#loadAll(Set, boolean, CompletionListener)}. + * Test for {@link Cache#loadAll(Set, boolean, CompletionListener)}. */ public abstract class IgniteCacheLoadAllAbstractTest extends IgniteCacheAbstractTest { + /** */ + private volatile boolean writeThrough = true; + + /** */ + private ConcurrentHashMap8 storeMap = new ConcurrentHashMap8<>(); + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { + CacheConfiguration ccfg = super.cacheConfiguration(gridName); + + ccfg.setWriteThrough(writeThrough); + + ccfg.setCacheLoaderFactory(new Factory() { + @Override public CacheLoader create() { + return new CacheLoader() { + @Override public Object load(Object key) throws CacheLoaderException { + return storeMap.get(key); + } + + @Override public Map loadAll(Iterable keys) throws CacheLoaderException { + Map loaded = new HashMap<>(); + + for (Object key : keys) { + Object val = storeMap.get(key); + + if (val != null) + loaded.put(key, val); + } + + return loaded; + } + }; + } + }); + + ccfg.setCacheWriterFactory(new Factory() { + @Override public CacheWriter create() { + return new CacheWriter() { + @Override public void write(Cache.Entry e) { + storeMap.put(e.getKey(), e.getValue()); + } + + @Override public void writeAll(Collection> entries) { + for (Cache.Entry e : entries) + write(e); + } + + @Override public void delete(Object key) { + storeMap.remove(key); + } + + @Override public void deleteAll(Collection keys) { + for (Object key : keys) + delete(key); + } + }; + } + }); + + return ccfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + storeMap = null; + } + /** * @throws Exception If failed. */ public void testLoadAll() throws Exception { - IgniteCache cache = jcache(0); + IgniteCache cache0 = jcache(0); + + // Put some data in cache, it also should be put in store. + + final int KEYS = 10; - for (int i = 0; i < 1000; i++) - cache.put(i, String.valueOf(i)); + for (int i = 0; i < KEYS; i++) + cache0.put(i, String.valueOf(i)); + + // Restart nodes with write-through disabled so that data in store does not change. stopAllGrids(); + writeThrough = false; + startGrids(); - cache = jcache(0); + cache0 = jcache(0); Set keys = new HashSet<>(); - for (int i = 0; i < 100; i++) + Map expVals = new HashMap<>(); + + for (int i = 0; i < KEYS / 2; i++) { + keys.add(i); + + expVals.put(i, String.valueOf(i)); + } + + for (int i = KEYS + 1000; i < KEYS + 1010; i++) + keys.add(i); + + CompletionListenerFuture fut = new CompletionListenerFuture(); + + log.info("Load1."); + + cache0.loadAll(keys, false, fut); + + fut.get(); + + checkValues(KEYS, expVals); + + HashMap expChangedVals = new HashMap<>(); + + for (int i = 0; i < KEYS / 2; i++) { + String val = "changed"; + + cache0.put(i, val); + + expChangedVals.put(i, val); + } + + checkValues(KEYS, expChangedVals); + + fut = new CompletionListenerFuture(); + + log.info("Load2."); + + cache0.loadAll(keys, false, fut); + + fut.get(); + + checkValues(KEYS, expChangedVals); + + log.info("Load3."); + + fut = new CompletionListenerFuture(); + + cache0.loadAll(keys, true, fut); + + fut.get(); + + checkValues(KEYS, expVals); + + for (int i = 0; i < KEYS; i++) { keys.add(i); - Set nonExistKeys = new HashSet<>(); + expVals.put(i, String.valueOf(i)); + } + + fut = new CompletionListenerFuture(); - for (int i = 10_000; i < 10_010; i++) - nonExistKeys.add(i); + log.info("Load4."); - keys.addAll(nonExistKeys); + cache0.loadAll(keys, false, fut); - CompletionListener lsnr = new CompletionListenerFuture(); + fut.get(); - cache.loadAll(keys, false, lsnr); + checkValues(KEYS, expVals); + } + /** + * @param keys Keys to check. + * @param expVals Expected values. + */ + private void checkValues(int keys, Map expVals) { GridCacheAffinity aff = cache(0).affinity(); for (int i = 0; i < gridCount(); i++) { ClusterNode node = ignite(i).cluster().localNode(); - IgniteCache cache0 = jcache(i); + IgniteCache cache = jcache(i); - for (int key = 0; key < 1000; key++) { - String expVal = (keys.contains(key) && !nonExistKeys.contains(key)) ? String.valueOf(key) : null; + for (int key = 0; key < keys; key++) { + String expVal = expVals.get(key); if (aff.isPrimaryOrBackup(node, key)) { - assertEquals(expVal, cache0.localPeek(key)); + assertEquals(expVal, cache.localPeek(key)); - assertEquals(expVal, cache0.get(key)); - } else { - assertNull(cache0.localPeek(key)); + assertEquals(expVal, cache.get(key)); + } + else { + assertNull(cache.localPeek(key)); - assertNull(cache0.get(key)); + if (!expVals.containsKey(key)) + assertNull(cache.get(key)); } + } + + for (int key = keys + 1000; i < keys + 1010; i++) { + assertNull(cache.localPeek(key)); + assertNull(cache.get(key)); } } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoaderWriterAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoaderWriterAbstractTest.java index 8da0d01bd0ceb..68829d82623ce 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoaderWriterAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoaderWriterAbstractTest.java @@ -17,69 +17,262 @@ package org.apache.ignite.internal.processors.cache.integration; +import org.apache.ignite.*; import org.apache.ignite.internal.processors.cache.*; +import org.jdk8.backport.*; import javax.cache.*; import javax.cache.configuration.*; import javax.cache.integration.*; +import javax.cache.processor.*; import java.util.*; +import java.util.concurrent.atomic.*; + +import static org.gridgain.grid.cache.GridCacheAtomicityMode.*; /** * */ public abstract class IgniteCacheLoaderWriterAbstractTest extends IgniteCacheAbstractTest { + /** */ + private static AtomicInteger ldrCallCnt = new AtomicInteger(); + + /** */ + private static AtomicInteger writerCallCnt = new AtomicInteger(); + + /** */ + private static ConcurrentHashMap8 storeMap = new ConcurrentHashMap8<>(); + /** {@inheritDoc} */ @Override protected Factory loaderFactory() { - return super.loaderFactory(); + return new Factory() { + @Override public CacheLoader create() { + return new TestLoader(); + } + }; } /** {@inheritDoc} */ @Override protected Factory writerFactory() { - return super.writerFactory(); + return new Factory() { + @Override public CacheWriter create() { + return new TestWriter(); + } + }; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + ldrCallCnt.set(0); + + writerCallCnt.set(0); + + storeMap.clear(); + } + + protected boolean putFromPrimary() { + return atomicityMode() == ATOMIC; } /** * @throws Exception If failed. */ - public void testLoad() throws Exception { + public void testLoaderWriter() throws Exception { + final Object key = Integer.MAX_VALUE; + + for (int i = 0; i < gridCount(); i++) { + log.info("Test with grid: " + i); + + storeMap.clear(); + + ldrCallCnt.set(0); + writerCallCnt.set(0); + + IgniteCache cache = jcache(i); + + assertNull(cache.get(key)); + + checkCalls(1, 0); + + storeMap.put(key, "test"); + + assertEquals("test", cache.get(key)); + + checkCalls(2, 0); + + assertTrue(storeMap.containsKey(key)); + + cache.remove(key); + + checkCalls(2, 1); + + assertFalse(storeMap.containsKey(key)); + + assertNull(cache.get(key)); + + checkCalls(3, 1); + + cache.put(key, "test1"); + + checkCalls(3, 2); + + assertEquals("test1", storeMap.get(key)); + + assertEquals("test1", cache.get(key)); + + checkCalls(3, 2); + + cache.invoke(key, new EntryProcessor() { + @Override public Object process(MutableEntry e, Object... args) { + e.setValue("test2"); + + return null; + } + }); + + checkCalls(3, 3); + + assertEquals("test2", storeMap.get(key)); + + assertEquals("test2", cache.get(key)); + + checkCalls(3, 3); + + cache.remove(key); + + checkCalls(3, 4); + } } /** - * + * @throws Exception If failed. + */ + public void testLoaderWriterBulk() throws Exception { + Map vals = new HashMap<>(); + + for (int i = 0; i < 100; i++) + vals.put(i, i); + + for (int i = 0; i < gridCount(); i++) { + log.info("Test with grid: " + i); + + storeMap.clear(); + + ldrCallCnt.set(0); + writerCallCnt.set(0); + + IgniteCache cache = jcache(i); + + assertTrue(cache.getAll(vals.keySet()).isEmpty()); + + int expLoads = gridCount(); + + checkCalls(expLoads, 0); + + storeMap.putAll(vals); + + assertEquals(vals, cache.getAll(vals.keySet())); + + expLoads += gridCount(); + + checkCalls(expLoads, 0); + + for (Object key : vals.keySet()) + assertTrue(storeMap.contains(key)); + + cache.removeAll(vals.keySet()); + + checkCalls(expLoads, gridCount()); + + for (Object key : vals.keySet()) + assertFalse(storeMap.containsKey(key)); + } + } + + /** + * @param expLdr Expected loader calls count. + * @param expWriter Expected writer calls count. */ - static class TestLoader implements CacheLoader { - /** */ - private volatile Map lastOp; + private void checkCalls(int expLdr, int expWriter) { + assertEquals(expLdr, ldrCallCnt.get()); + + assertEquals(expWriter, writerCallCnt.get()); + } + /** + * + */ + class TestLoader implements CacheLoader { /** {@inheritDoc} */ - @Override public Integer load(Integer key) { - return null; + @Override public Object load(Object key) { + log.info("Load: " + key); + + ldrCallCnt.incrementAndGet(); + + return storeMap.get(key); } /** {@inheritDoc} */ - @Override public Map loadAll(Iterable keys) { - return null; + @Override public Map loadAll(Iterable keys) { + log.info("LoadAll: " + keys); + + ldrCallCnt.incrementAndGet(); + + Map loaded = new HashMap<>(); + + for (Object key : keys) { + Object val = storeMap.get(key); + + if (val != null) + loaded.put(key, val); + } + + return loaded; } } /** * */ - static class TestWriter implements CacheWriter { + class TestWriter implements CacheWriter { /** {@inheritDoc} */ - @Override public void write(Cache.Entry entry) { + @Override public void write(Cache.Entry e) { + log.info("Write: " + e); + + writerCallCnt.incrementAndGet(); + + storeMap.put(e.getKey(), e.getValue()); } /** {@inheritDoc} */ @Override public void writeAll(Collection> entries) { + log.info("WriteAll: " + entries); + + writerCallCnt.incrementAndGet(); + + for (Cache.Entry e : entries) + storeMap.put(e.getKey(), e.getValue()); } /** {@inheritDoc} */ @Override public void delete(Object key) { + log.info("Delete: " + key); + + writerCallCnt.incrementAndGet(); + + storeMap.remove(key); } /** {@inheritDoc} */ @Override public void deleteAll(Collection keys) { + log.info("DeleteAll: " + keys); + + writerCallCnt.incrementAndGet(); + + for (Object key : keys) + storeMap.remove(key); } } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheTxLoadAllTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheTxLoadAllTest.java new file mode 100644 index 0000000000000..ab256a9719053 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheTxLoadAllTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.integration; + +import org.gridgain.grid.cache.*; + +import static org.gridgain.grid.cache.GridCacheAtomicWriteOrderMode.*; +import static org.gridgain.grid.cache.GridCacheAtomicityMode.*; +import static org.gridgain.grid.cache.GridCacheDistributionMode.*; +import static org.gridgain.grid.cache.GridCacheMode.*; + +/** + * + */ +public class IgniteCacheTxLoadAllTest extends IgniteCacheLoadAllAbstractTest { + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 3; + } + + /** {@inheritDoc} */ + @Override protected GridCacheMode cacheMode() { + return PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected GridCacheAtomicityMode atomicityMode() { + return TRANSACTIONAL; + } + + /** {@inheritDoc} */ + @Override protected GridCacheDistributionMode distributionMode() { + return PARTITIONED_ONLY; + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheTxLoaderWriterTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheTxLoaderWriterTest.java new file mode 100644 index 0000000000000..c1be2b00521fa --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheTxLoaderWriterTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.integration; + +import org.gridgain.grid.cache.*; + +import static org.gridgain.grid.cache.GridCacheAtomicityMode.*; +import static org.gridgain.grid.cache.GridCacheDistributionMode.*; +import static org.gridgain.grid.cache.GridCacheMode.*; + +/** + * + */ +public class IgniteCacheTxLoaderWriterTest extends IgniteCacheLoaderWriterAbstractTest { + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 3; + } + + /** {@inheritDoc} */ + @Override protected GridCacheMode cacheMode() { + return PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected GridCacheAtomicityMode atomicityMode() { + return TRANSACTIONAL; + } + + /** {@inheritDoc} */ + @Override protected GridCacheDistributionMode distributionMode() { + return PARTITIONED_ONLY; + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheTxLocalLoadAllTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheTxLocalLoadAllTest.java new file mode 100644 index 0000000000000..4cb7fb2c2391c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheTxLocalLoadAllTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.integration; + +import org.gridgain.grid.cache.*; + +import static org.gridgain.grid.cache.GridCacheAtomicityMode.*; +import static org.gridgain.grid.cache.GridCacheDistributionMode.*; +import static org.gridgain.grid.cache.GridCacheMode.*; + +/** + * + */ +public class IgniteCacheTxLocalLoadAllTest extends IgniteCacheLoadAllAbstractTest { + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override protected GridCacheMode cacheMode() { + return LOCAL; + } + + /** {@inheritDoc} */ + @Override protected GridCacheAtomicityMode atomicityMode() { + return TRANSACTIONAL; + } + + /** {@inheritDoc} */ + @Override protected GridCacheDistributionMode distributionMode() { + return PARTITIONED_ONLY; + } +} diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCachePartitionedNearDisabledTxOriginatingNodeFailureSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCachePartitionedNearDisabledTxOriginatingNodeFailureSelfTest.java index 68c19a3c38124..fd981ce6161c8 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCachePartitionedNearDisabledTxOriginatingNodeFailureSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCachePartitionedNearDisabledTxOriginatingNodeFailureSelfTest.java @@ -24,7 +24,6 @@ */ public class GridCachePartitionedNearDisabledTxOriginatingNodeFailureSelfTest extends GridCachePartitionedTxOriginatingNodeFailureSelfTest { - /** {@inheritDoc} */ @Override protected GridCacheDistributionMode distributionMode() { return GridCacheDistributionMode.PARTITIONED_ONLY; diff --git a/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java b/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java index 73c85f7bbbcf6..2e7775ba685b2 100644 --- a/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java +++ b/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java @@ -20,6 +20,7 @@ import junit.framework.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.expiry.*; +import org.apache.ignite.internal.processors.cache.integration.*; import org.gridgain.grid.*; import org.gridgain.grid.cache.affinity.fair.*; import org.gridgain.grid.cache.store.*; @@ -335,6 +336,14 @@ public static TestSuite suite() throws Exception { suite.addTestSuite(GridCacheMultinodeUpdateAtomicSelfTest.class); suite.addTestSuite(GridCacheMultinodeUpdateAtomicNearEnabledSelfTest.class); + suite.addTestSuite(IgniteCacheAtomicLoadAllTest.class); + suite.addTestSuite(IgniteCacheAtomicLocalLoadAllTest.class); + suite.addTestSuite(IgniteCacheTxLoadAllTest.class); + suite.addTestSuite(IgniteCacheTxLocalLoadAllTest.class); + + suite.addTestSuite(IgniteCacheAtomicLoaderWriterTest.class); + //suite.addTestSuite(IgniteCacheTxLoaderWriterTest.class); + return suite; } }