From 161892eee237a197d93f29c4e332fe1b0a7c9397 Mon Sep 17 00:00:00 2001 From: dkarachentsev Date: Wed, 6 Dec 2017 16:06:11 +0300 Subject: [PATCH 1/5] IGNITE-7086 - Backups are not updated when ReadFromBackup=true and ReadThrough happens. --- .../distributed/dht/GridDhtCacheAdapter.java | 1 + .../dht/GridPartitionedGetFuture.java | 77 +++++- .../dht/GridPartitionedSingleGetFuture.java | 55 ++++- .../distributed/near/GridNearGetFuture.java | 79 +++++- .../store/CacheStoreReadFromBackupTest.java | 231 ++++++++++++++++++ ...eTransactionalStoreReadFromBackupTest.java | 32 +++ .../testsuites/IgniteCacheTestSuite.java | 4 + 7 files changed, 470 insertions(+), 9 deletions(-) create mode 100644 modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreReadFromBackupTest.java create mode 100644 modules/core/src/test/java/org/apache/ignite/cache/store/CacheTransactionalStoreReadFromBackupTest.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index bbb2c5b4983d3..28f9c7627ef5d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -287,6 +287,7 @@ public void dumpDebugInfo() { } } + /** {@inheritDoc} */ @Override public void onKernalStop() { super.onKernalStop(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index 015eb82411560..e54ea77be540e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.EntryGetResult; +import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; @@ -41,10 +42,11 @@ import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; +import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.dr.GridDrType; import org.apache.ignite.internal.util.GridLeanMap; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -57,6 +59,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; @@ -224,7 +227,7 @@ private boolean isMini(IgniteInternalFuture f) { private void map( Collection keys, Map> mapped, - AffinityTopologyVersion topVer + final AffinityTopologyVersion topVer ) { Collection cacheNodes = CU.affinityNodes(cctx, topVer); @@ -331,7 +334,53 @@ private void map( })); } else { - MiniFuture fut = new MiniFuture(n, mappedKeys, topVer); + IgniteInClosure> clos = null; + + if (readThrough && !skipVals) { + clos = new CI1>() { + @Override public void apply(Collection infos) { + if (!F.isEmpty(infos)) { + GridCacheAffinityManager aff = cctx.affinity(); + ClusterNode locNode = cctx.localNode(); + + for (GridCacheEntryInfo info : infos) { + if (aff.backupsByKey(info.key(), topVer).contains(locNode)) { + while (true) { + GridCacheEntryEx entry = null; + GridDhtCacheAdapter colocated = cctx.dht(); + + try { + entry = colocated.entryEx(info.key(), topVer); + + entry.initialValue( + info.value(), + info.version(), + 0, + 0, + false, + topVer, + GridDrType.DR_BACKUP, + true); + + break; + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed entry during postprocessing (will retry): " + + entry); + } + catch (IgniteCheckedException e) { + U.error(log, "Error saving backup value: " + entry, e); + } + } + } + } + } + } + }; + } + + MiniFuture fut = new MiniFuture(n, mappedKeys, topVer, clos); GridCacheMessage req = new GridNearGetRequest( cctx.cacheId(), @@ -647,6 +696,9 @@ private class MiniFuture extends GridFutureAdapter> { /** Topology version on which this future was mapped. */ private final AffinityTopologyVersion topVer; + /** Post processing closure. */ + private final IgniteInClosure> postProcessingClos; + /** {@code True} if remapped after node left. */ private boolean remapped; @@ -654,11 +706,14 @@ private class MiniFuture extends GridFutureAdapter> { * @param node Node. * @param keys Keys. * @param topVer Topology version. + * @param postProcessingClos Post processing closure. */ - MiniFuture(ClusterNode node, LinkedHashMap keys, AffinityTopologyVersion topVer) { + MiniFuture(ClusterNode node, LinkedHashMap keys, AffinityTopologyVersion topVer, + @Nullable IgniteInClosure> postProcessingClos) { this.node = node; this.keys = keys; this.topVer = topVer; + this.postProcessingClos = postProcessingClos; } /** @@ -774,6 +829,8 @@ void onResult(final GridNearGetResponse res) { } }), F.t(node, keys), topVer); + postProcessResult(res); + onDone(createResultMap(res.entries())); return; @@ -795,12 +852,16 @@ void onResult(final GridNearGetResponse res) { } }), F.t(node, keys), topVer); + postProcessResult(res); + onDone(createResultMap(res.entries())); } }); } else { try { + postProcessResult(res); + onDone(createResultMap(res.entries())); } catch (Exception e) { @@ -809,6 +870,14 @@ void onResult(final GridNearGetResponse res) { } } + /** + * @param res Response. + */ + private void postProcessResult(final GridNearGetResponse res) { + if (postProcessingClos != null) + postProcessingClos.apply(res.entries()); + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(MiniFuture.class, this); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java index 61489e5ba3b6b..bbecb0913be9f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java @@ -41,18 +41,20 @@ import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse; +import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.dr.GridDrType; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CIX1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; @@ -122,6 +124,9 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter postProcessingClos; + /** * @param cctx Context. * @param key Key. @@ -275,6 +280,46 @@ private void map(final AffinityTopologyVersion topVer) { cctx.mvcc().addFuture(this, futId); } + boolean needVer = this.needVer; + + if (readThrough && !skipVals && cctx.affinity().backupsByKey(key, topVer).contains(cctx.localNode())) { + // Need version to correctly store value. + needVer = true; + + postProcessingClos = new IgniteBiInClosure() { + @Override public void apply(CacheObject obj, GridCacheVersion ver) { + while (true) { + GridCacheEntryEx entry = null; + GridDhtCacheAdapter colocated = cctx.dht(); + + try { + entry = colocated.entryEx(key, topVer); + + entry.initialValue( + obj, + ver, + 0, + 0, + false, + topVer, + GridDrType.DR_BACKUP, + true); + + break; + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed entry during postprocessing (will retry): " + + entry); + } + catch (IgniteCheckedException e) { + U.error(log, "Error saving backup value: " + entry, e); + } + } + } + }; + } + GridCacheMessage req = new GridNearSingleGetRequest(cctx.cacheId(), futId.localId(), key, @@ -502,6 +547,11 @@ public void onResult(UUID nodeId, GridNearSingleGetResponse res) { else { if (skipVals) setSkipValueResult(res.containsValue(), null); + else if (readThrough && res0 instanceof CacheVersionedValue/* postProcessingClos != null*/) { // TODO Volatile read or instanceof!!! + CacheVersionedValue verVal = (CacheVersionedValue)res0; + + setResult(verVal.value(), verVal.version()); + } else setResult((CacheObject)res0, null); } @@ -640,6 +690,9 @@ private void setResult(@Nullable CacheObject val, @Nullable GridCacheVersion ver assert !skipVals; if (val != null) { + if (postProcessingClos != null) + postProcessingClos.apply(val, ver); + if (!keepCacheObjects) { Object res = cctx.unwrapBinaryIfNeeded(val, !deserializeBinary); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index 23615073cb28c..d44965ed0aadb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.EntryGetResult; +import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; @@ -47,6 +48,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.dr.GridDrType; import org.apache.ignite.internal.util.GridLeanMap; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -59,6 +61,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; @@ -363,7 +366,55 @@ private void map( cctx.mvcc().addFuture(this, futId); } - MiniFuture fut = new MiniFuture(n, mappedKeys, saved, topVer); + IgniteInClosure> clos = null; + + if (readThrough && !skipVals) { + clos = new CI1>() { + @Override public void apply(Collection infos) { + if (!F.isEmpty(infos)) { + GridCacheAffinityManager aff = cctx.affinity(); + ClusterNode locNode = cctx.localNode(); + + for (GridCacheEntryInfo info : infos) { + if (aff.backupsByKey(info.key(), topVer).contains(locNode)) { + while (true) { + assert cctx.cache().isNear(); + + GridCacheEntryEx entry = null; + GridDhtCacheAdapter colocated = ((GridNearCacheAdapter)cctx.cache()).dht(); + + try { + entry = colocated.entryEx(info.key(), topVer); + + entry.initialValue( + info.value(), + info.version(), + 0, + 0, + false, + topVer, + GridDrType.DR_BACKUP, + true); + + break; + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed entry during postprocessing (will retry): " + + entry); + } + catch (IgniteCheckedException e) { + U.error(log, "Error saving backup value: " + entry, e); + } + } + } + } + } + } + }; + } + + MiniFuture fut = new MiniFuture(n, mappedKeys, saved, topVer, clos); GridCacheMessage req = new GridNearGetRequest( cctx.cacheId(), @@ -834,6 +885,9 @@ private class MiniFuture extends GridFutureAdapter> { /** Topology version on which this future was mapped. */ private AffinityTopologyVersion topVer; + /** Post processing closure. */ + private final IgniteInClosure> postProcessingClos; + /** {@code True} if remapped after node left. */ private boolean remapped; @@ -842,17 +896,19 @@ private class MiniFuture extends GridFutureAdapter> { * @param keys Keys. * @param savedEntries Saved entries. * @param topVer Topology version. + * @param postProcessingClos Post processing closure. */ MiniFuture( ClusterNode node, LinkedHashMap keys, Map savedEntries, - AffinityTopologyVersion topVer - ) { + AffinityTopologyVersion topVer, + IgniteInClosure> postProcessingClos) { this.node = node; this.keys = keys; this.savedEntries = savedEntries; this.topVer = topVer; + this.postProcessingClos = postProcessingClos; } /** @@ -977,6 +1033,8 @@ void onResult(final GridNearGetResponse res) { } }), F.t(node, keys), topVer); + postProcessResult(res); + // It is critical to call onDone after adding futures to compound list. onDone(loadEntries(node.id(), keys.keySet(), res.entries(), savedEntries, topVer)); @@ -998,13 +1056,26 @@ void onResult(final GridNearGetResponse res) { } }), F.t(node, keys), readyTopVer); + postProcessResult(res); + // It is critical to call onDone after adding futures to compound list. onDone(loadEntries(node.id(), keys.keySet(), res.entries(), savedEntries, topVer)); } }); } - else + else { + postProcessResult(res); + onDone(loadEntries(node.id(), keys.keySet(), res.entries(), savedEntries, topVer)); + } + } + + /** + * @param res Response. + */ + private void postProcessResult(final GridNearGetResponse res) { + if (postProcessingClos != null) + postProcessingClos.apply(res.entries()); } /** {@inheritDoc} */ diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreReadFromBackupTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreReadFromBackupTest.java new file mode 100644 index 0000000000000..a2ff2fb5903cc --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreReadFromBackupTest.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import javax.cache.Cache; +import javax.cache.configuration.FactoryBuilder; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.lang.IgniteBiInClosure; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CachePeekMode.BACKUP; +import static org.apache.ignite.cache.CachePeekMode.PRIMARY; + +/** + * Checks that once value is read from store, it will be loaded in + * backups as well. + */ +public class CacheStoreReadFromBackupTest extends GridCommonAbstractTest { + /** */ + public static final String CACHE_NAME = "cache"; + + /** */ + private static final Map storeMap = new ConcurrentHashMap<>(); + + /** */ + private CacheMode cacheMode = REPLICATED; + + /** */ + private int backups; + + /** Near. */ + private boolean near; + + /** */ + @SuppressWarnings("unchecked") + private CacheConfiguration cacheConfig(String cacheName) { + CacheConfiguration ccfg = new CacheConfiguration<>(cacheName); + + ccfg.setCacheMode(cacheMode); + ccfg.setBackups(backups); + ccfg.setAtomicityMode(atomicityMode()); + ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + ccfg.setAffinity(new RendezvousAffinityFunction(false, 1)); + ccfg.setReadThrough(true); + ccfg.setReadFromBackup(true); + ccfg.setCacheStoreFactory(FactoryBuilder.factoryOf(TestStore.class)); + + if (near) + ccfg.setNearConfiguration(new NearCacheConfiguration()); + + return ccfg; + } + + /** */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + return super.getConfiguration(gridName).setCacheConfiguration(cacheConfig(CACHE_NAME)); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * @return Atomicity mode. + */ + protected CacheAtomicityMode atomicityMode() { + return ATOMIC; + } + + /** + * @throws Exception If failed. + */ + public void testReplicated() throws Exception { + cacheMode = REPLICATED; + backups = 0; + near = false; + + checkReadFromBackup(); + } + + /** + * @throws Exception If failed. + */ + public void testPartitioned() throws Exception { + cacheMode = PARTITIONED; + backups = 1; + near = false; + + checkReadFromBackup(); + } + + /** + * @throws Exception If failed. + */ + public void testNearReplicated() throws Exception { + cacheMode = REPLICATED; + backups = 0; + near = true; + + checkReadFromBackup(); + } + + /** + * @throws Exception If failed. + */ + public void testNearPartitioned() throws Exception { + cacheMode = PARTITIONED; + backups = 1; + near = true; + + checkReadFromBackup(); + } + + /** + * @throws Exception If failed. + */ + private void checkReadFromBackup() throws Exception { + startGridsMultiThreaded(2, true); + + checkReadSingleFromBackup(); + checkReadAllFromBackup(); + } + + private void checkReadSingleFromBackup() throws Exception { + storeMap.put(1, "val-1"); + + IgniteCache cache0 = grid(0).cache(CACHE_NAME); + IgniteCache cache1 = grid(1).cache(CACHE_NAME); + + // Load value on primary and backup. + assertNotNull(cache0.get(1)); + assertNotNull(cache1.get(1)); + + if (cache0.localPeek(1, PRIMARY) != null) + assertNotNull(cache1.localPeek(1, BACKUP)); + else { + assertNotNull(cache0.localPeek(1, BACKUP)); + assertNotNull(cache1.localPeek(1, PRIMARY)); + } + } + + private void checkReadAllFromBackup() throws Exception { + for (int i = 0; i < 100; i++) + storeMap.put(i, String.valueOf(i)); + + IgniteCache cache0 = grid(0).cache(CACHE_NAME); + IgniteCache cache1 = grid(1).cache(CACHE_NAME); + + assertEquals(storeMap.size(), cache0.getAll(storeMap.keySet()).size()); + assertEquals(storeMap.size(), cache1.getAll(storeMap.keySet()).size()); + + Affinity aff = grid(0).affinity(CACHE_NAME); + ClusterNode node0 = grid(0).cluster().localNode(); + + for (Integer key : storeMap.keySet()) { + if (aff.isPrimary(node0, key)) { + assertNotNull(cache0.localPeek(key, PRIMARY)); + assertNotNull(cache1.localPeek(key, BACKUP)); + } + else { + assertNotNull(cache0.localPeek(key, BACKUP)); + assertNotNull(cache1.localPeek(key, PRIMARY)); + } + } + } + + /** + * + */ + public static class TestStore extends CacheStoreAdapter { + /** */ + public TestStore() { + } + + /** {@inheritDoc} */ + @Override public void loadCache(IgniteBiInClosure clo, Object... args) { + for (Map.Entry e : storeMap.entrySet()) + clo.apply(e.getKey(), e.getValue()); + } + + /** {@inheritDoc} */ + @Override public String load(Integer key) { + return storeMap.get(key); + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry entry) { + storeMap.put(entry.getKey(), entry.getValue()); + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) { + storeMap.remove(key); + } + + /** {@inheritDoc} */ + @Override public void sessionEnd(boolean commit) { + // No-op. + } + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheTransactionalStoreReadFromBackupTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheTransactionalStoreReadFromBackupTest.java new file mode 100644 index 0000000000000..4837936621f46 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheTransactionalStoreReadFromBackupTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store; + +import org.apache.ignite.cache.CacheAtomicityMode; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; + +/** + * + */ +public class CacheTransactionalStoreReadFromBackupTest extends CacheStoreReadFromBackupTest { + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return TRANSACTIONAL; + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java index e3ebbc16e00fe..b309b0af3db47 100755 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java @@ -20,6 +20,8 @@ import java.util.Set; import junit.framework.TestSuite; import org.apache.ignite.cache.IgniteWarmupClosureSelfTest; +import org.apache.ignite.cache.store.CacheStoreReadFromBackupTest; +import org.apache.ignite.cache.store.CacheTransactionalStoreReadFromBackupTest; import org.apache.ignite.cache.store.GridCacheBalancingStoreSelfTest; import org.apache.ignite.cache.store.GridCacheLoadOnlyStoreAdapterSelfTest; import org.apache.ignite.cache.store.GridStoreLoadCacheTest; @@ -318,6 +320,8 @@ public static TestSuite suite(Set ignoredTests) throws Exception { suite.addTestSuite(IgniteIncompleteCacheObjectSelfTest.class); suite.addTestSuite(GridStoreLoadCacheTest.class); + suite.addTestSuite(CacheStoreReadFromBackupTest.class); + suite.addTestSuite(CacheTransactionalStoreReadFromBackupTest.class); return suite; } From 4c97d6b74a36a4b2e4ad01d4a4c5112d0a598541 Mon Sep 17 00:00:00 2001 From: dkarachentsev Date: Thu, 7 Dec 2017 11:07:24 +0300 Subject: [PATCH 2/5] IGNITE-7086 - Review fixes. --- .../dht/CacheDistributedGetFutureAdapter.java | 73 +++++++++++++++++++ .../dht/GridPartitionedGetFuture.java | 50 +------------ .../dht/GridPartitionedSingleGetFuture.java | 15 +++- .../distributed/near/GridNearGetFuture.java | 54 +------------- 4 files changed, 89 insertions(+), 103 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java index 2257c9f35734a..65f0ec3677f5d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java @@ -22,16 +22,26 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; +import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; +import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.GridCacheFuture; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; +import org.apache.ignite.internal.processors.dr.GridDrType; +import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; @@ -186,4 +196,67 @@ protected final ClusterTopologyServerNotFoundException serverNotFoundError(Affin return new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + "(all partition nodes left the grid) [topVer=" + topVer + ", cache=" + cctx.name() + ']'); } + + /** + * @param topVer Topology version. + * @param log Logger. + */ + protected CI1> createPostProcessingClosure( + final AffinityTopologyVersion topVer, final IgniteLogger log) { + if (!readThrough || skipVals) + return null; + + return new CI1>() { + @Override public void apply(Collection infos) { + if (!F.isEmpty(infos)) { + GridCacheAffinityManager aff = cctx.affinity(); + ClusterNode locNode = cctx.localNode(); + + for (GridCacheEntryInfo info : infos) { + if (aff.backupsByKey(info.key(), topVer).contains(locNode)) { + GridDhtCacheAdapter colocated = cctx.cache().isNear() + ? ((GridNearCacheAdapter)cctx.cache()).dht() + : cctx.dht(); + + while (true) { + GridCacheEntryEx entry = null; + + try { + entry = colocated.entryEx(info.key(), topVer); + + entry.initialValue( + info.value(), + info.version(), + 0, + 0, + false, + topVer, + GridDrType.DR_BACKUP, + true); + + break; + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed entry during postprocessing (will retry): " + + entry); + } + catch (GridDhtInvalidPartitionException ignored) { + break; + } + catch (IgniteCheckedException e) { + U.error(log, "Error saving backup value: " + entry, e); + } + finally { + assert entry != null; + + cctx.evicts().touch(entry, topVer); + } + } + } + } + } + } + }; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index e54ea77be540e..6ce68675ce7c1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -34,7 +34,6 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.EntryGetResult; -import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; @@ -46,7 +45,6 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.processors.dr.GridDrType; import org.apache.ignite.internal.util.GridLeanMap; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -334,53 +332,7 @@ private void map( })); } else { - IgniteInClosure> clos = null; - - if (readThrough && !skipVals) { - clos = new CI1>() { - @Override public void apply(Collection infos) { - if (!F.isEmpty(infos)) { - GridCacheAffinityManager aff = cctx.affinity(); - ClusterNode locNode = cctx.localNode(); - - for (GridCacheEntryInfo info : infos) { - if (aff.backupsByKey(info.key(), topVer).contains(locNode)) { - while (true) { - GridCacheEntryEx entry = null; - GridDhtCacheAdapter colocated = cctx.dht(); - - try { - entry = colocated.entryEx(info.key(), topVer); - - entry.initialValue( - info.value(), - info.version(), - 0, - 0, - false, - topVer, - GridDrType.DR_BACKUP, - true); - - break; - } - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Got removed entry during postprocessing (will retry): " + - entry); - } - catch (IgniteCheckedException e) { - U.error(log, "Error saving backup value: " + entry, e); - } - } - } - } - } - } - }; - } - - MiniFuture fut = new MiniFuture(n, mappedKeys, topVer, clos); + MiniFuture fut = new MiniFuture(n, mappedKeys, topVer, createPostProcessingClosure(topVer, log)); GridCacheMessage req = new GridNearGetRequest( cctx.cacheId(), diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java index bbecb0913be9f..79eeaa841d57d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java @@ -287,7 +287,7 @@ private void map(final AffinityTopologyVersion topVer) { needVer = true; postProcessingClos = new IgniteBiInClosure() { - @Override public void apply(CacheObject obj, GridCacheVersion ver) { + @Override public void apply(CacheObject val, GridCacheVersion ver) { while (true) { GridCacheEntryEx entry = null; GridDhtCacheAdapter colocated = cctx.dht(); @@ -296,7 +296,7 @@ private void map(final AffinityTopologyVersion topVer) { entry = colocated.entryEx(key, topVer); entry.initialValue( - obj, + val, ver, 0, 0, @@ -315,6 +315,14 @@ private void map(final AffinityTopologyVersion topVer) { catch (IgniteCheckedException e) { U.error(log, "Error saving backup value: " + entry, e); } + catch (GridDhtInvalidPartitionException ignored) { + break; + } + finally { + assert entry != null; + + cctx.evicts().touch(entry, topVer); + } } } }; @@ -547,7 +555,8 @@ public void onResult(UUID nodeId, GridNearSingleGetResponse res) { else { if (skipVals) setSkipValueResult(res.containsValue(), null); - else if (readThrough && res0 instanceof CacheVersionedValue/* postProcessingClos != null*/) { // TODO Volatile read or instanceof!!! + else if (readThrough && res0 instanceof CacheVersionedValue) { + // Could be versioned value for store in backup. CacheVersionedValue verVal = (CacheVersionedValue)res0; setResult(verVal.value(), verVal.version()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index d44965ed0aadb..46e71df633b24 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -34,7 +34,6 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.EntryGetResult; -import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; @@ -48,7 +47,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.processors.dr.GridDrType; import org.apache.ignite.internal.util.GridLeanMap; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -366,55 +364,7 @@ private void map( cctx.mvcc().addFuture(this, futId); } - IgniteInClosure> clos = null; - - if (readThrough && !skipVals) { - clos = new CI1>() { - @Override public void apply(Collection infos) { - if (!F.isEmpty(infos)) { - GridCacheAffinityManager aff = cctx.affinity(); - ClusterNode locNode = cctx.localNode(); - - for (GridCacheEntryInfo info : infos) { - if (aff.backupsByKey(info.key(), topVer).contains(locNode)) { - while (true) { - assert cctx.cache().isNear(); - - GridCacheEntryEx entry = null; - GridDhtCacheAdapter colocated = ((GridNearCacheAdapter)cctx.cache()).dht(); - - try { - entry = colocated.entryEx(info.key(), topVer); - - entry.initialValue( - info.value(), - info.version(), - 0, - 0, - false, - topVer, - GridDrType.DR_BACKUP, - true); - - break; - } - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Got removed entry during postprocessing (will retry): " + - entry); - } - catch (IgniteCheckedException e) { - U.error(log, "Error saving backup value: " + entry, e); - } - } - } - } - } - } - }; - } - - MiniFuture fut = new MiniFuture(n, mappedKeys, saved, topVer, clos); + MiniFuture fut = new MiniFuture(n, mappedKeys, saved, topVer, createPostProcessingClosure(topVer, log)); GridCacheMessage req = new GridNearGetRequest( cctx.cacheId(), @@ -449,6 +399,8 @@ private void map( } } + + /** * @param mappings Mappings. * @param key Key to map. From 767b9cd1b64a1f6c2ab99beb9b7205aa2f3ec5a8 Mon Sep 17 00:00:00 2001 From: dkarachentsev Date: Thu, 7 Dec 2017 11:21:45 +0300 Subject: [PATCH 3/5] IGNITE-7086 - Javadoc. --- .../ignite/cache/store/CacheStoreReadFromBackupTest.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreReadFromBackupTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreReadFromBackupTest.java index a2ff2fb5903cc..d8913dcf1992f 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreReadFromBackupTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreReadFromBackupTest.java @@ -151,6 +151,9 @@ private void checkReadFromBackup() throws Exception { checkReadAllFromBackup(); } + /** + * @throws Exception If failed. + */ private void checkReadSingleFromBackup() throws Exception { storeMap.put(1, "val-1"); @@ -169,6 +172,9 @@ private void checkReadSingleFromBackup() throws Exception { } } + /** + * @throws Exception If failed. + */ private void checkReadAllFromBackup() throws Exception { for (int i = 0; i < 100; i++) storeMap.put(i, String.valueOf(i)); @@ -219,6 +225,7 @@ public TestStore() { } /** {@inheritDoc} */ + @SuppressWarnings("SuspiciousMethodCalls") @Override public void delete(Object key) { storeMap.remove(key); } From e6431d133b6ec0ba17e02069e99ccce54f4eb31a Mon Sep 17 00:00:00 2001 From: dkarachentsev Date: Thu, 7 Dec 2017 17:10:09 +0300 Subject: [PATCH 4/5] IGNITE-7086 - Review fixes. --- .../processors/cache/GridCacheUtils.java | 104 ++++++++++++++++++ .../dht/CacheDistributedGetFutureAdapter.java | 73 ------------ .../dht/GridPartitionedGetFuture.java | 3 +- .../dht/GridPartitionedSingleGetFuture.java | 47 +------- .../distributed/near/GridNearGetFuture.java | 9 +- 5 files changed, 115 insertions(+), 121 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index 248f2aada0a52..f50872369fbb1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -66,10 +66,14 @@ import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.dr.GridDrType; import org.apache.ignite.internal.processors.igfs.IgfsUtils; import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.processors.query.schema.SchemaOperationException; @@ -84,6 +88,7 @@ import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgnitePredicate; @@ -1677,6 +1682,98 @@ else if (cfg.getCacheMode() == REPLICATED) { } } + /** + * Creates closure that saves initial value to backup partition. + *

+ * Useful only when store with readThrough is used. In situation when + * get() on backup node returns successful result, it's expected that + * localPeek() will be successful as well. But it doesn't true when + * primary node loaded value from local store, in this case backups + * will remain non-initialized. + *
+ * To meet that requirement the value requested from primary should + * be saved on backup during get(). + *

+ * + * @param topVer Topology version. + * @param log Logger. + * @param cctx Cache context. + * @param key Key. + * @param readThrough Read through. + * @param skipVals Skip values. + */ + @Nullable public static BackupPostProcessingClosure createBackupPostProcessingClosure( + final AffinityTopologyVersion topVer, + final IgniteLogger log, + final GridCacheContext cctx, + final @Nullable KeyCacheObject key, + boolean readThrough, + boolean skipVals + ) { + if (!readThrough || skipVals) + return null; + + return new BackupPostProcessingClosure() { + private void process(KeyCacheObject key, CacheObject val, GridCacheVersion ver, GridDhtCacheAdapter colocated) { + while (true) { + GridCacheEntryEx entry = null; + + try { + entry = colocated.entryEx(key, topVer); + + entry.initialValue( + val, + ver, + 0, + 0, + false, + topVer, + GridDrType.DR_BACKUP, + true); + + break; + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed entry during postprocessing (will retry): " + + entry); + } + catch (IgniteCheckedException e) { + U.error(log, "Error saving backup value: " + entry, e); + } + catch (GridDhtInvalidPartitionException ignored) { + break; + } + finally { + if (entry != null) + cctx.evicts().touch(entry, topVer); + } + } + } + + @Override public void apply(CacheObject val, GridCacheVersion ver) { + process(key, val, ver, cctx.dht()); + } + + @Override public void apply(Collection infos) { + if (!F.isEmpty(infos)) { + GridCacheAffinityManager aff = cctx.affinity(); + ClusterNode locNode = cctx.localNode(); + + GridDhtCacheAdapter colocated = cctx.cache().isNear() + ? ((GridNearCacheAdapter)cctx.cache()).dht() + : cctx.dht(); + + for (GridCacheEntryInfo info : infos) { + // Save backup value. + if (aff.backupsByKey(info.key(), topVer).contains(locNode)) + process(info.key(), info.value(), info.version(), colocated); + } + } + } + }; + } + /** * Checks if cache configuration belongs to persistent cache. * @@ -1752,4 +1849,11 @@ public static boolean isPersistenceEnabled(DataStorageConfiguration cfg) { return false; } + + /** + * + */ + public interface BackupPostProcessingClosure extends IgniteInClosure>, + IgniteBiInClosure{ + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java index 65f0ec3677f5d..2257c9f35734a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java @@ -22,26 +22,16 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture; import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; -import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; -import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.GridCacheFuture; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; -import org.apache.ignite.internal.processors.dr.GridDrType; -import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; -import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; @@ -196,67 +186,4 @@ protected final ClusterTopologyServerNotFoundException serverNotFoundError(Affin return new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + "(all partition nodes left the grid) [topVer=" + topVer + ", cache=" + cctx.name() + ']'); } - - /** - * @param topVer Topology version. - * @param log Logger. - */ - protected CI1> createPostProcessingClosure( - final AffinityTopologyVersion topVer, final IgniteLogger log) { - if (!readThrough || skipVals) - return null; - - return new CI1>() { - @Override public void apply(Collection infos) { - if (!F.isEmpty(infos)) { - GridCacheAffinityManager aff = cctx.affinity(); - ClusterNode locNode = cctx.localNode(); - - for (GridCacheEntryInfo info : infos) { - if (aff.backupsByKey(info.key(), topVer).contains(locNode)) { - GridDhtCacheAdapter colocated = cctx.cache().isNear() - ? ((GridNearCacheAdapter)cctx.cache()).dht() - : cctx.dht(); - - while (true) { - GridCacheEntryEx entry = null; - - try { - entry = colocated.entryEx(info.key(), topVer); - - entry.initialValue( - info.value(), - info.version(), - 0, - 0, - false, - topVer, - GridDrType.DR_BACKUP, - true); - - break; - } - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Got removed entry during postprocessing (will retry): " + - entry); - } - catch (GridDhtInvalidPartitionException ignored) { - break; - } - catch (IgniteCheckedException e) { - U.error(log, "Error saving backup value: " + entry, e); - } - finally { - assert entry != null; - - cctx.evicts().touch(entry, topVer); - } - } - } - } - } - } - }; - } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index 6ce68675ce7c1..3954cf97e0ff9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -332,7 +332,8 @@ private void map( })); } else { - MiniFuture fut = new MiniFuture(n, mappedKeys, topVer, createPostProcessingClosure(topVer, log)); + MiniFuture fut = new MiniFuture(n, mappedKeys, topVer, + CU.createBackupPostProcessingClosure(topVer, log, cctx, null, readThrough, skipVals)); GridCacheMessage req = new GridNearGetRequest( cctx.cacheId(), diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java index 79eeaa841d57d..b7c12fbd0062e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java @@ -39,6 +39,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheFuture; import org.apache.ignite.internal.processors.cache.GridCacheFutureAdapter; import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheUtils.BackupPostProcessingClosure; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue; @@ -47,14 +48,13 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.processors.dr.GridDrType; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CIX1; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; @@ -125,7 +125,7 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter postProcessingClos; + private volatile BackupPostProcessingClosure postProcessingClos; /** * @param cctx Context. @@ -286,46 +286,7 @@ private void map(final AffinityTopologyVersion topVer) { // Need version to correctly store value. needVer = true; - postProcessingClos = new IgniteBiInClosure() { - @Override public void apply(CacheObject val, GridCacheVersion ver) { - while (true) { - GridCacheEntryEx entry = null; - GridDhtCacheAdapter colocated = cctx.dht(); - - try { - entry = colocated.entryEx(key, topVer); - - entry.initialValue( - val, - ver, - 0, - 0, - false, - topVer, - GridDrType.DR_BACKUP, - true); - - break; - } - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Got removed entry during postprocessing (will retry): " + - entry); - } - catch (IgniteCheckedException e) { - U.error(log, "Error saving backup value: " + entry, e); - } - catch (GridDhtInvalidPartitionException ignored) { - break; - } - finally { - assert entry != null; - - cctx.evicts().touch(entry, topVer); - } - } - } - }; + postProcessingClos = CU.createBackupPostProcessingClosure(topVer, log, cctx, key, readThrough, skipVals); } GridCacheMessage req = new GridNearSingleGetRequest(cctx.cacheId(), diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index 46e71df633b24..75b9baf565502 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -39,6 +39,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheUtils.BackupPostProcessingClosure; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.CacheDistributedGetFutureAdapter; @@ -59,7 +60,6 @@ import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; @@ -364,7 +364,8 @@ private void map( cctx.mvcc().addFuture(this, futId); } - MiniFuture fut = new MiniFuture(n, mappedKeys, saved, topVer, createPostProcessingClosure(topVer, log)); + MiniFuture fut = new MiniFuture(n, mappedKeys, saved, topVer, + CU.createBackupPostProcessingClosure(topVer, log, cctx, null, readThrough, skipVals)); GridCacheMessage req = new GridNearGetRequest( cctx.cacheId(), @@ -838,7 +839,7 @@ private class MiniFuture extends GridFutureAdapter> { private AffinityTopologyVersion topVer; /** Post processing closure. */ - private final IgniteInClosure> postProcessingClos; + private final BackupPostProcessingClosure postProcessingClos; /** {@code True} if remapped after node left. */ private boolean remapped; @@ -855,7 +856,7 @@ private class MiniFuture extends GridFutureAdapter> { LinkedHashMap keys, Map savedEntries, AffinityTopologyVersion topVer, - IgniteInClosure> postProcessingClos) { + BackupPostProcessingClosure postProcessingClos) { this.node = node; this.keys = keys; this.savedEntries = savedEntries; From ffd33a84e48504645bc74c3650464f9eab0c5c4a Mon Sep 17 00:00:00 2001 From: dkarachentsev Date: Fri, 8 Dec 2017 13:30:51 +0300 Subject: [PATCH 5/5] IGNITE-7086 - Review fixes. --- .../ignite/internal/processors/cache/GridCacheUtils.java | 3 ++- .../distributed/dht/GridPartitionedSingleGetFuture.java | 6 ++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index f50872369fbb1..4bf54bf2626e1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -1710,7 +1710,8 @@ else if (cfg.getCacheMode() == REPLICATED) { boolean readThrough, boolean skipVals ) { - if (!readThrough || skipVals) + if (!readThrough || skipVals || + (key != null && !cctx.affinity().backupsByKey(key, topVer).contains(cctx.localNode()))) return null; return new BackupPostProcessingClosure() { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java index b7c12fbd0062e..f761b9c42d2e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java @@ -282,11 +282,13 @@ private void map(final AffinityTopologyVersion topVer) { boolean needVer = this.needVer; - if (readThrough && !skipVals && cctx.affinity().backupsByKey(key, topVer).contains(cctx.localNode())) { + final BackupPostProcessingClosure postClos = CU.createBackupPostProcessingClosure(topVer, log, cctx, key, readThrough, skipVals); + + if (postClos != null) { // Need version to correctly store value. needVer = true; - postProcessingClos = CU.createBackupPostProcessingClosure(topVer, log, cctx, key, readThrough, skipVals); + postProcessingClos = postClos; } GridCacheMessage req = new GridNearSingleGetRequest(cctx.cacheId(),