Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-21482 : Fix of initialization of the last version in GridCacheVersionManager - Tests #11246

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
111 changes: 69 additions & 42 deletions modules/core/src/main/java/org/apache/ignite/dump/DumpReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
Expand All @@ -43,6 +44,7 @@
import org.apache.ignite.lang.IgniteExperimental;
import org.apache.ignite.spi.IgniteSpiAdapter;
import org.apache.ignite.spi.encryption.EncryptionSpi;
import org.jetbrains.annotations.Nullable;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_MARSHALLER_PATH;
Expand All @@ -62,52 +64,52 @@ public class DumpReader implements Runnable {
private final DumpReaderConfiguration cfg;

/** Log. */
private final IgniteLogger log;
protected final IgniteLogger log;

/** */
protected final Map<Integer, List<String>> grpToNodes = new HashMap<>();

/** */
@Nullable protected final Map<Integer, String> cacheGrpIds;

/** */
protected Dump dump;

/** */
protected DumpConsumer cnsmr;

/** */
protected final ExecutorService execSvc;

/**
* @param cfg Dump reader configuration.
* @param log Logger.
*/
public DumpReader(DumpReaderConfiguration cfg, IgniteLogger log) {
this.cfg = cfg;
this.log = log.getLogger(DumpReader.class);
this.log = log.getLogger(getClass());

this.cacheGrpIds = cfg.cacheGroupNames() != null
? Arrays.stream(cfg.cacheGroupNames()).collect(Collectors.toMap(CU::cacheId, Function.identity()))
: null;

this.execSvc = Executors.newFixedThreadPool(Math.min(1, cfg.threadCount()));
}

/** {@inheritDoc} */
@Override public void run() {
ackAsciiLogo();

try (Dump dump = new Dump(cfg.dumpRoot(), null, cfg.keepBinary(), false, encryptionSpi(), log)) {
DumpConsumer cnsmr = cfg.consumer();
this.dump = dump;
this.cnsmr = cfg.consumer();

cnsmr.start();

try {
File[] files = new File(cfg.dumpRoot(), DFLT_MARSHALLER_PATH).listFiles(BinaryUtils::notTmpFile);

if (files != null)
cnsmr.onMappings(CdcMain.typeMappingIterator(files, tm -> true));

cnsmr.onTypes(dump.types());
dump.metadata().forEach(this::onMeta);

Map<Integer, List<String>> grpToNodes = new HashMap<>();

Set<Integer> cacheGrpIds = cfg.cacheGroupNames() != null
? Arrays.stream(cfg.cacheGroupNames()).map(CU::cacheId).collect(Collectors.toSet())
: null;

for (SnapshotMetadata meta : dump.metadata()) {
for (Integer grp : meta.cacheGroupIds()) {
if (cacheGrpIds == null || cacheGrpIds.contains(grp))
grpToNodes.computeIfAbsent(grp, key -> new ArrayList<>()).add(meta.folderName());
}
}

cnsmr.onCacheConfigs(grpToNodes.entrySet().stream()
.flatMap(e -> dump.configs(F.first(e.getValue()), e.getKey()).stream())
.iterator());

ExecutorService execSvc = cfg.threadCount() > 1 ? Executors.newFixedThreadPool(cfg.threadCount()) : null;
beforePartitions();

AtomicBoolean skip = new AtomicBoolean(false);

Expand Down Expand Up @@ -155,32 +157,57 @@ public DumpReader(DumpReaderConfiguration cfg, IgniteLogger log) {
}
};

if (cfg.threadCount() > 1)
execSvc.submit(consumePart);
else
consumePart.run();
execSvc.submit(consumePart);
}
}
}

if (cfg.threadCount() > 1) {
execSvc.shutdown();
execSvc.shutdown();

boolean res = execSvc.awaitTermination(cfg.timeout().toMillis(), MILLISECONDS);
boolean res = execSvc.awaitTermination(cfg.timeout().toMillis(), MILLISECONDS);

if (!res) {
log.warning("Dump processing tasks not finished after timeout. Cancelling");
if (!res) {
log.warning("Dump processing tasks not finished after timeout. Cancelling");

execSvc.shutdownNow();
}
execSvc.shutdownNow();
}
}
finally {
cnsmr.stop();
try {
cnsmr.stop();
}
finally {
this.dump = null;
this.cnsmr = null;

this.execSvc.shutdown();
}
}
}
catch (Exception e) {
throw new IgniteException(e);
throw e instanceof IgniteException ? (IgniteException)e : new IgniteException("Failed to run dump reader.", e);
}
}

/** */
protected void beforePartitions() {
File[] files = new File(cfg.dumpRoot(), DFLT_MARSHALLER_PATH).listFiles(BinaryUtils::notTmpFile);

if (files != null)
cnsmr.onMappings(CdcMain.typeMappingIterator(files, tm -> true));

cnsmr.onTypes(dump.types());

cnsmr.onCacheConfigs(grpToNodes.entrySet().stream()
.flatMap(e -> dump.configs(F.first(e.getValue()), e.getKey()).stream())
.iterator());
}

/** */
protected void onMeta(SnapshotMetadata meta) {
for (Integer grp : meta.cacheGroupIds()) {
if (cacheGrpIds == null || cacheGrpIds.keySet().contains(grp))
grpToNodes.computeIfAbsent(grp, key -> new ArrayList<>()).add(meta.folderName());
}
}

Expand Down Expand Up @@ -234,8 +261,8 @@ private void ackAsciiLogo() {
}
}

/** */
private EncryptionSpi encryptionSpi() {
/** @return EncryptionSpi. */
public EncryptionSpi encryptionSpi() {
EncryptionSpi encSpi = cfg.encryptionSpi();

if (encSpi == null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1920,73 +1920,7 @@ public IgniteInternalFuture<SnapshotPartitionsVerifyTaskResult> checkSnapshot(
Map<Integer, String> grpIds = grps == null ? Collections.emptyMap() :
grps.stream().collect(Collectors.toMap(CU::cacheId, v -> v));

byte[] masterKeyDigest = kctx0.config().getEncryptionSpi().masterKeyDigest();

for (List<SnapshotMetadata> nodeMetas : metas.values()) {
for (SnapshotMetadata meta : nodeMetas) {
byte[] snpMasterKeyDigest = meta.masterKeyDigest();

if (masterKeyDigest == null && snpMasterKeyDigest != null) {
res.onDone(new SnapshotPartitionsVerifyTaskResult(metas, new IdleVerifyResultV2(
Collections.singletonMap(cctx.localNode(), new IllegalArgumentException("Snapshot '" +
meta.snapshotName() + "' has encrypted caches while encryption is disabled. To " +
"restore this snapshot, start Ignite with configured encryption and the same " +
"master key.")))));

return;
}

if (snpMasterKeyDigest != null && !Arrays.equals(snpMasterKeyDigest, masterKeyDigest)) {
res.onDone(new SnapshotPartitionsVerifyTaskResult(metas, new IdleVerifyResultV2(
Collections.singletonMap(cctx.localNode(), new IllegalArgumentException("Snapshot '" +
meta.snapshotName() + "' has different master key digest. To restore this " +
"snapshot, start Ignite with the same master key.")))));

return;
}

if (meta.hasCompressedGroups() && grpIds.keySet().stream().anyMatch(meta::isGroupWithCompresion)) {
try {
kctx0.compress().checkPageCompressionSupported();
}
catch (IgniteCheckedException e) {
String grpWithCompr = grpIds.entrySet().stream()
.filter(grp -> meta.isGroupWithCompresion(grp.getKey()))
.map(Map.Entry::getValue).collect(Collectors.joining(", "));

String msg = "Requested cache groups [" + grpWithCompr + "] for check " +
"from snapshot '" + meta.snapshotName() + "' are compressed while " +
"disk page compression is disabled. To check these groups please " +
"start Ignite with ignite-compress module in classpath";

res.onDone(new SnapshotPartitionsVerifyTaskResult(metas, new IdleVerifyResultV2(
Collections.singletonMap(cctx.localNode(), new IllegalArgumentException(msg)))));

return;
}
}

grpIds.keySet().removeAll(meta.partitions().keySet());
}
}

if (!grpIds.isEmpty()) {
res.onDone(new SnapshotPartitionsVerifyTaskResult(metas,
new IdleVerifyResultV2(Collections.singletonMap(cctx.localNode(),
new IllegalArgumentException("Cache group(s) was not " +
"found in the snapshot [groups=" + grpIds.values() + ", snapshot=" + name + ']')))));

return;
}

if (metas.isEmpty()) {
res.onDone(new SnapshotPartitionsVerifyTaskResult(metas,
new IdleVerifyResultV2(Collections.singletonMap(cctx.localNode(),
new IllegalArgumentException("Snapshot does not exists [snapshot=" + name +
(snpPath != null ? ", baseDir=" + snpPath : "") + ']')))));

return;
}
checkMetas(name, snpPath, metas, grpIds, res, kctx0.config().getEncryptionSpi(), kctx0.compress(), cctx.localNode());

Class<? extends AbstractSnapshotVerificationTask> cls;

Expand Down Expand Up @@ -2028,6 +1962,84 @@ else if (f0.error() instanceof IgniteSnapshotVerifyException)
return res;
}

/** */
public static void checkMetas(
String snpName,
String snpPath,
Map<ClusterNode, List<SnapshotMetadata>> metas,
Map<Integer, String> grpIds,
GridFutureAdapter<SnapshotPartitionsVerifyTaskResult> res,
EncryptionSpi encryptionSpi,
CompressionProcessor compressionProc,
ClusterNode node
) {
byte[] masterKeyDigest = encryptionSpi.masterKeyDigest();

for (List<SnapshotMetadata> nodeMetas : metas.values()) {
for (SnapshotMetadata meta : nodeMetas) {
byte[] snpMasterKeyDigest = meta.masterKeyDigest();

if (masterKeyDigest == null && snpMasterKeyDigest != null) {
res.onDone(new SnapshotPartitionsVerifyTaskResult(metas, new IdleVerifyResultV2(
Collections.singletonMap(node, new IllegalArgumentException("Snapshot '" +
meta.snapshotName() + "' has encrypted caches while encryption is disabled. To " +
"restore this snapshot, start Ignite with configured encryption and the same " +
"master key.")))));

return;
}

if (snpMasterKeyDigest != null && !Arrays.equals(snpMasterKeyDigest, masterKeyDigest)) {
res.onDone(new SnapshotPartitionsVerifyTaskResult(metas, new IdleVerifyResultV2(
Collections.singletonMap(node, new IllegalArgumentException("Snapshot '" +
meta.snapshotName() + "' has different master key digest. To restore this " +
"snapshot, start Ignite with the same master key.")))));

return;
}

if (meta.hasCompressedGroups() && grpIds.keySet().stream().anyMatch(meta::isGroupWithCompresion)) {
try {
compressionProc.checkPageCompressionSupported();
}
catch (IgniteCheckedException e) {
String grpWithCompr = grpIds.entrySet().stream()
.filter(grp -> meta.isGroupWithCompresion(grp.getKey()))
.map(Map.Entry::getValue).collect(Collectors.joining(", "));

String msg = "Requested cache groups [" + grpWithCompr + "] for check " +
"from snapshot '" + meta.snapshotName() + "' are compressed while " +
"disk page compression is disabled. To check these groups please " +
"start Ignite with ignite-compress module in classpath";

res.onDone(new SnapshotPartitionsVerifyTaskResult(metas, new IdleVerifyResultV2(
Collections.singletonMap(node, new IllegalArgumentException(msg)))));

return;
}
}

grpIds.keySet().removeAll(meta.partitions().keySet());
}
}

if (!grpIds.isEmpty()) {
res.onDone(new SnapshotPartitionsVerifyTaskResult(metas,
new IdleVerifyResultV2(Collections.singletonMap(node,
new IllegalArgumentException("Cache group(s) was not " +
"found in the snapshot [groups=" + grpIds.values() + ", snapshot=" + snpName + ']')))));

return;
}

if (metas.isEmpty()) {
res.onDone(new SnapshotPartitionsVerifyTaskResult(metas,
new IdleVerifyResultV2(Collections.singletonMap(node,
new IllegalArgumentException("Snapshot does not exists [snapshot=" + snpName +
(snpPath != null ? ", baseDir=" + snpPath : "") + ']')))));
}
}

/**
* @param snpName Snapshot name.
* @param folderName The name of a directory for the cache group.
Expand Down Expand Up @@ -3283,13 +3295,13 @@ private void initialize(GridKernalContext ctx, ExecutorService execSvc) {
this.execSvc = execSvc;

// Register system default snapshot integrity check that is used before the restore operation.
registerHandler(new SnapshotPartitionsVerifyHandler(ctx.cache().context()));
registerHandler(new SnapshotPartitionsVerifyHandler(ctx));

// Register system default DataStreamer updates check.
registerHandler(new DataStreamerUpdatesHandler());

// Register system default page size and counters check that is used at the creation operation.
registerHandler(new SnapshotPartitionsQuickVerifyHandler(ctx.cache().context()));
registerHandler(new SnapshotPartitionsQuickVerifyHandler(ctx));

// Register custom handlers.
SnapshotHandler<Object>[] extHnds = (SnapshotHandler<Object>[])ctx.plugins().extensions(SnapshotHandler.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ public void warnings(List<String> warnings) {
/**
* @return Snapshot creation warnings.
*/
public List<String> warnings() {
@Nullable public List<String> warnings() {
return warnings;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import java.util.Objects;
import java.util.Set;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.management.cache.PartitionKeyV2;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
Expand All @@ -41,10 +41,10 @@ public class SnapshotPartitionsQuickVerifyHandler extends SnapshotPartitionsVeri
"able restore rest the caches from this snapshot.";

/**
* @param cctx Shared context.
* @param ctx Grid context.
*/
public SnapshotPartitionsQuickVerifyHandler(GridCacheSharedContext<?, ?> cctx) {
super(cctx);
public SnapshotPartitionsQuickVerifyHandler(GridKernalContext ctx) {
super(ctx);
}

/** {@inheritDoc} */
Expand Down