Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.apache.ignite.IgniteSnapshot;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.binary.BinaryType;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeTask;
import org.apache.ignite.configuration.CacheConfiguration;
Expand Down Expand Up @@ -119,6 +120,7 @@
import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.CacheType;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
import org.apache.ignite.internal.processors.cache.GridLocalConfigManager;
Expand Down Expand Up @@ -1386,6 +1388,9 @@ private void processLocalSnapshotEndStageResult(UUID id, Map<UUID, SnapshotOpera
markWalFut = null;

incSnpId = null;

if (clusterSnpFut != null && endFail.isEmpty() && snpReq.error() == null)
warnAtomicCachesInIncrementalSnapshot(snpReq.snapshotName(), snpReq.incrementIndex(), snpReq.groups());
}

clusterSnpReq = null;
Expand Down Expand Up @@ -2114,6 +2119,33 @@ public IgniteFutureImpl<Void> createSnapshot(String name, @Nullable String snpPa
}
}

/** Writes a warning message if an incremental snapshot contains atomic caches. */
void warnAtomicCachesInIncrementalSnapshot(String snpName, int incIdx, Collection<String> cacheGrps) {
List<String> warnCaches = new ArrayList<>();

for (String cacheGrp: cacheGrps) {
CacheGroupContext cgctx = cctx.cache().cacheGroup(CU.cacheId(cacheGrp));

if (cgctx != null && cgctx.hasAtomicCaches()) {
for (GridCacheContext<?, ?> c : cgctx.caches()) {
CacheConfiguration<?, ?> ccfg = c.config();

if (ccfg.getAtomicityMode() == CacheAtomicityMode.ATOMIC && ccfg.getBackups() > 0)
warnCaches.add(ccfg.getName());
}
}
}

if (warnCaches.isEmpty())
return;

U.warn(log, "Incremental snapshot [snpName=" + snpName + ", incIdx=" + incIdx + "] contains ATOMIC caches with backups: "
+ warnCaches + ". Please note, incremental snapshots doesn't guarantee consistency of restored atomic caches. " +
"It is highly recommended to verify these caches after restoring with the \"idle_verify\" command. " +
"If it is needed it's possible to repair inconsistent partitions with the \"consistency\" command. " +
"Please, check the \"Control Script\" section of Ignite docs for more information about these commands.");
}

/** {@inheritDoc} */
@Override public IgniteFuture<Void> restoreSnapshot(String name, @Nullable Collection<String> grpNames) {
return restoreSnapshot(name, null, grpNames, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1683,6 +1683,15 @@ private void finishIncrementalSnapshotRestore(UUID reqId, Map<UUID, Boolean> res
orElse(checkNodeLeft(opCtx0.nodes(), res.keySet()));

if (failure == null) {
if (fut != null) { // Call on originated node only.
Set<String> cacheGrps = opCtx0.cfgs.keySet().stream()
.map(cacheId -> CU.cacheOrGroupName(ctx.cache().cacheDescriptor(cacheId).cacheConfiguration()))
.collect(Collectors.toSet());

ctx.cache().context().snapshotMgr()
.warnAtomicCachesInIncrementalSnapshot(opCtx0.snpName, opCtx0.incIdx, cacheGrps);
}

finishProcess(reqId, null);

return;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.cache.persistence.snapshot.incremental;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.ignite.Ignite;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.LogListener;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;

/** */
public class IncrementalSnapshotWarnAtomicCachesTest extends GridCommonAbstractTest {
/** */
private static final String SNP = "snapshot";

/** */
private static final ListeningTestLogger lsnLogger = new ListeningTestLogger();

/** */
private CacheConfiguration<Integer, Integer>[] ccfgs;

/** */
@Override protected IgniteConfiguration getConfiguration(String instanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(instanceName);

cfg.setDataStorageConfiguration(new DataStorageConfiguration()
.setWalCompactionEnabled(true)
.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
.setName("persistence")
.setPersistenceEnabled(true)));

cfg.setCacheConfiguration(ccfgs);

cfg.setGridLogger(lsnLogger);

cfg.setConsistentId(String.valueOf(getTestIgniteInstanceIndex(instanceName)));

return cfg;
}

/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
cleanPersistenceDir();
}

/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();

cleanPersistenceDir();
}

/** */
@Test
public void testTransactionalCacheNoWarn() throws Exception {
checkCachesSnapshotCreationAndRestore(
cacheConfiguration(CacheAtomicityMode.TRANSACTIONAL, 1, "cache3", null));
}

/** */
@Test
public void testDefaultCacheGroup() throws Exception {
checkCachesSnapshotCreationAndRestore(prepareCacheConfs(null, null, null, null));
}

/** */
@Test
public void testMultipleCachesInGroupWarn() throws Exception {
checkCachesSnapshotCreationAndRestore(prepareCacheConfs(null, "grp0", "grp0", null));
}

/** */
@Test
public void testMixedGroupWarnOnlyAtomic() throws Exception {
checkCachesSnapshotCreationAndRestore(prepareCacheConfs(null, null, "grp0", "grp0"));
}

/** */
private CacheConfiguration<Integer, Integer>[] prepareCacheConfs(String grp0, String grp1, String grp2, String grp3) {
return new CacheConfiguration[] {
cacheConfiguration(CacheAtomicityMode.ATOMIC, 0, "cache0", grp0),
cacheConfiguration(CacheAtomicityMode.ATOMIC, 1, "cache1", grp1),
cacheConfiguration(CacheAtomicityMode.ATOMIC, 1, "cache2", grp2),
cacheConfiguration(CacheAtomicityMode.TRANSACTIONAL, 1, "cache3", grp3)
};
}

/** */
public void checkCachesSnapshotCreationAndRestore(CacheConfiguration<Integer, Integer>... ccfgs) throws Exception {
List<Integer> allWarnCaches = new ArrayList<>();
Map<String, List<Integer>> warnCachesByGrps = new HashMap<>();

for (CacheConfiguration<?, ?> ccfg: ccfgs) {
if (ccfg.getAtomicityMode() == CacheAtomicityMode.ATOMIC && ccfg.getBackups() > 0) {
String grpName = ccfg.getGroupName() == null ? ccfg.getName() : ccfg.getGroupName();

warnCachesByGrps.compute(grpName, (grp, caches) -> {
caches = caches == null ? new ArrayList<>() : caches;

int cacheNum = Integer.parseInt(ccfg.getName().replace("cache", ""));

caches.add(cacheNum);

allWarnCaches.add(cacheNum);

return caches;
});
}
}

checkWarnMessageOnCreateSnapshot(cachesPattern(allWarnCaches), ccfgs);
checkWarnMessageOnRestoreSnapshot(cachesPattern(allWarnCaches), null);

for (String grp: warnCachesByGrps.keySet())
checkWarnMessageOnRestoreSnapshot(cachesPattern(warnCachesByGrps.get(grp)), F.asList(grp));
}

/** Transforms cache numbers to cache pattern. For example, [0, 1] -> cache[0,1], cache[0,1]. */
private @Nullable String cachesPattern(List<Integer> cacheNums) {
if (cacheNums.isEmpty())
return null;

return cacheNums.stream()
.map(c -> "cache" + cacheNums)
.collect(Collectors.joining(", "));
}

/** */
private void checkWarnMessageOnCreateSnapshot(
@Nullable String warnAtomicCaches,
CacheConfiguration<Integer, Integer>... ccfgs
) throws Exception {
this.ccfgs = ccfgs;

Ignite g = startGrids(3);

g.cluster().state(ClusterState.ACTIVE);

for (CacheConfiguration<Integer, Integer> c: ccfgs) {
for (int i = 0; i < 1_000; i++)
g.cache(c.getName()).put(i, i);
}

LogListener lsnr = warnLogListener(warnAtomicCaches, 0); // Should not warn for full snapshots.

lsnLogger.registerListener(lsnr);

g.snapshot().createSnapshot(SNP).get(getTestTimeout());

assertTrue(warnAtomicCaches, lsnr.check());

for (CacheConfiguration<Integer, Integer> c: ccfgs) {
for (int i = 1_000; i < 2_000; i++)
g.cache(c.getName()).put(i, i);
}

lsnr = warnLogListener(warnAtomicCaches, warnAtomicCaches == null ? 0 : 1);

lsnLogger.registerListener(lsnr);

g.snapshot().createIncrementalSnapshot(SNP).get(getTestTimeout());

assertTrue(warnAtomicCaches, lsnr.check());
}

/** */
private void checkWarnMessageOnRestoreSnapshot(
@Nullable String warnAtomicCaches,
@Nullable Collection<String> restoreCacheGrps
) throws Exception {
stopAllGrids();

cleanPersistenceDir(true);

Ignite g = startGrids(3);

g.cluster().state(ClusterState.ACTIVE);

g.destroyCaches(g.cacheNames());

awaitPartitionMapExchange();

LogListener lsnr = warnLogListener(warnAtomicCaches, 0); // Should not warn for full snapshots.

lsnLogger.registerListener(lsnr);

g.snapshot().restoreSnapshot(SNP, restoreCacheGrps).get(getTestTimeout());

assertTrue(warnAtomicCaches + " " + restoreCacheGrps, lsnr.check());

g.destroyCaches(g.cacheNames());

awaitPartitionMapExchange();

lsnr = warnLogListener(warnAtomicCaches, warnAtomicCaches == null ? 0 : 1);

lsnLogger.registerListener(lsnr);

g.snapshot().restoreIncrementalSnapshot(SNP, restoreCacheGrps, 1).get(getTestTimeout());

assertTrue(warnAtomicCaches + " " + restoreCacheGrps, lsnr.check());
}

/** */
private LogListener warnLogListener(@Nullable String atomicCaches, int times) {
Pattern p = Pattern.compile(
"Incremental snapshot \\[snpName=" + SNP + ", incIdx=1] contains ATOMIC caches with backups:"
+ (atomicCaches == null ? "" : " \\[" + atomicCaches) + ']');

return LogListener.matches(p).times(times).build();
}

/** */
private CacheConfiguration<Integer, Integer> cacheConfiguration(CacheAtomicityMode mode, int backups, String name, String grpName) {
return new CacheConfiguration<Integer, Integer>()
.setName(name)
.setGroupName(grpName)
.setAtomicityMode(mode)
.setBackups(backups)
.setAffinity(new RendezvousAffinityFunction().setPartitions(100));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.ignite.internal.processors.cache.persistence.snapshot.incremental.IncrementalSnapshotTwoBackupMessagesBlockingTest;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.incremental.IncrementalSnapshotTwoBackupWALBlockingTest;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.incremental.IncrementalSnapshotTxRecoveryTest;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.incremental.IncrementalSnapshotWarnAtomicCachesTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;

Expand All @@ -46,7 +47,8 @@
IncrementalSnapshotNodeFailureTest.class,
IncrementalSnapshotTxRecoveryTest.class,
IncrementalSnapshotTest.class,
IncrementalSnapshotRestoreTest.class
IncrementalSnapshotRestoreTest.class,
IncrementalSnapshotWarnAtomicCachesTest.class
})
public class IncrementalSnapshotsTestSuite {
}