Skip to content

Commit

Permalink
GG-27745 Develop cache inconsistency checker code review refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
ingvard committed Feb 27, 2020
1 parent c3fd156 commit b2b4dd5
Show file tree
Hide file tree
Showing 47 changed files with 609 additions and 594 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ public enum CacheCommandList {
IDLE_VERIFY("idle_verify", new IdleVerify()),

/**
* Check consistency of primary and backup partitions assuming that there might be concurrent updated.
* Checks consistency of primary and backup partitions assuming that there might be concurrently updated.
*/
PARTITION_RECONCILIATION("partition-reconciliation", new PartitionReconciliation()),

/**
* Does cancel of partition reconciliation command.
* Cancels partition reconciliation command.
*/
PARTITION_RECONCILIATION_CANCEL("partition-reconciliation-cancel", new PartitionReconciliationCancel()),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ public enum CacheSubcommands {
IDLE_VERIFY("idle_verify", IdleVerifyCommandArg.class, new IdleVerify()),

/**
* Check consistency of primary and backup partitions assuming that there might be concurrent updated.
* Checks consistency of primary and backup partitions assuming that there might be concurrently updated.
*/
PARTITION_RECONCILIATION("partition-reconciliation", PartitionReconciliationCommandArg.class, new PartitionReconciliation()),

/**
* Does cancel of partition reconciliation command.
* Cancels partition reconciliation command.
*/
PARTITION_RECONCILIATION_CANCEL("partition-reconciliation-cancel", null, new PartitionReconciliationCancel()),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.apache.ignite.internal.commandline.CommandLogger;
import org.apache.ignite.internal.commandline.argument.CommandArgUtils;
import org.apache.ignite.internal.commandline.cache.argument.PartitionReconciliationCommandArg;
import org.apache.ignite.internal.processors.cache.checker.objects.PartitionReconciliationResult;
import org.apache.ignite.internal.processors.cache.checker.objects.ReconciliationAffectedEntries;
import org.apache.ignite.internal.processors.cache.checker.objects.ReconciliationResult;
import org.apache.ignite.internal.processors.cache.verify.RepairAlgorithm;
import org.apache.ignite.internal.util.typedef.internal.SB;
Expand Down Expand Up @@ -69,28 +69,29 @@
*/
public class PartitionReconciliation implements Command<PartitionReconciliation.Arguments> {
/** Parallelism format error message. */
public static final String PARALLELISM_FORMAT_MESSAGE = "Invalid parallelism: %s. Integer value " +
"from 1 to 128 should be specified, or 0 (Runtime.getRuntime().availableProcessors() " +
"will be used in such case).";
public static final String PARALLELISM_FORMAT_MESSAGE = "The positive integer should be specified, " +
"or 0 (number of cores on a server node will be used as parallelism in such case). " +
"If the given value is greater than the number of cores on a server node, " +
"the behavior will be equal to the case when 0 is specified.";

/** Batch size format error message. */
public static final String BATCH_SIZE_FORMAT_MESSAGE = "Invalid batch size: %s" +
". Int value greater than zero should be used.";
". Integer value greater than zero should be used.";

/** Recheck attempts format error message. */
public static final String RECHECK_ATTEMPTS_FORMAT_MESSAGE = "Invalid recheck attempts: %s" +
". Int value between 1 and 5 should be used.";
". Integer value between 1 (inclusive) and 5 (exclusive) should be used.";

/** Recheck delay format error message. */
public static final String RECHECK_DELAY_FORMAT_MESSAGE = "Invalid recheck delay: %s" +
". Int value between 0 and 100 should be used.";
". Integer value between 0 (inclusive) and 100 (exclusive) should be used.";

/** Command parsed arguments. */
private Arguments args;

/** {@inheritDoc} */
@Override public void printUsage(Logger log) {
String CACHES = "cacheName1,...,cacheNameN";
String caches = "cacheName1,...,cacheNameN";

String desc = "Verify whether there are inconsistent entries for the specified caches " +
"and print out the differences if any. Fix inconsistency if " + REPAIR + "argument is presented. " +
Expand All @@ -103,18 +104,17 @@ public class PartitionReconciliation implements Command<PartitionReconciliation.

paramsDesc.put(REPAIR.toString(),
"If present, fix all inconsistent data. Specifies which repair algorithm to use for doubtful keys. The following values can be used: "
+ Arrays.toString(RepairAlgorithm.values()) + " Default value is " + REPAIR.defaultValue() + '.');
+ Arrays.toString(RepairAlgorithm.values()) + ". Default value is " + REPAIR.defaultValue() + '.');

paramsDesc.put(PARALLELISM.toString(),
"Maximum number of threads that can be involved in partition reconciliation activities on one node. " +
"Default value is " + PARALLELISM.defaultValue() + ", which means the value will be initialzed with " +
"Runtime.getRuntime().availableProcessors() of a server node.");
"Default value equals number of cores.");

paramsDesc.put(BATCH_SIZE.toString(),
"Amount of keys to retrieve within one job. Default value is " + BATCH_SIZE.defaultValue() + '.');

paramsDesc.put(RECHECK_ATTEMPTS.toString(),
"Amount of potentially inconsistent keys recheck attempts. Value between 1 and 5 should be used." +
"Amount of potentially inconsistent keys recheck attempts. Value between 1 (inclusive) and 5 (exclusive) should be used." +
" Default value is " + RECHECK_ATTEMPTS.defaultValue() + '.');

paramsDesc.put(INCLUDE_SENSITIVE.toString(),
Expand All @@ -129,7 +129,7 @@ public class PartitionReconciliation implements Command<PartitionReconciliation.
desc,
paramsDesc,
optional(REPAIR), optional(PARALLELISM), optional(BATCH_SIZE), optional(RECHECK_ATTEMPTS),
optional(INCLUDE_SENSITIVE), optional(CACHES));
optional(INCLUDE_SENSITIVE), optional(caches));
}

/** {@inheritDoc} */
Expand All @@ -155,7 +155,7 @@ public class PartitionReconciliation implements Command<PartitionReconciliation.
}

/**
* Prepare arguments, execute partition reconciliation task, print logs and optionally fix inconsistency.
* Prepares arguments, executes partition reconciliation task, prints logs and optionally fix inconsistency.
*
* @param client Client node to run initial task.
* @param clientCfg Client configuration.
Expand All @@ -170,9 +170,9 @@ private ReconciliationResult partitionReconciliationCheck(
) throws GridClientException {
VisorPartitionReconciliationTaskArg taskArg = new VisorPartitionReconciliationTaskArg(
args.caches,
args.fixMode,
args.verbose,
args.console,
args.repair,
args.includeSensitive,
args.locOutput,
args.parallelism,
args.batchSize,
args.recheckAttempts,
Expand All @@ -192,7 +192,7 @@ private ReconciliationResult partitionReconciliationCheck(
.map(n -> String.format(strErrReason, n.nodeId(), n.consistentId()))
.collect(toList());

print(new ReconciliationResult(new PartitionReconciliationResult(), new HashMap<>(), errs), log::info);
print(new ReconciliationResult(new ReconciliationAffectedEntries(), new HashMap<>(), errs), log::info);

throw new VisorIllegalStateException("There are server nodes not supported partition reconciliation.");
}
Expand All @@ -209,9 +209,9 @@ private ReconciliationResult partitionReconciliationCheck(
/** {@inheritDoc} */
@Override public void parseArguments(CommandArgIterator argIter) {
Set<String> cacheNames = null;
boolean fixMode = false;
boolean repair = false;
boolean verbose = (boolean)INCLUDE_SENSITIVE.defaultValue();
boolean console = (boolean)LOCAL_OUTPUT.defaultValue();
boolean locOutput = (boolean)LOCAL_OUTPUT.defaultValue();
int parallelism = (int)PARALLELISM.defaultValue();
int batchSize = (int)BATCH_SIZE.defaultValue();
int recheckAttempts = (int)RECHECK_ATTEMPTS.defaultValue();
Expand All @@ -236,11 +236,11 @@ private ReconciliationResult partitionReconciliationCheck(

switch (arg) {
case REPAIR:
fixMode = true;
repair = true;

String peekedNextArg = argIter.peekNextArg();

if (!PartitionReconciliationCommandArg.commands().contains(peekedNextArg)) {
if (!PartitionReconciliationCommandArg.args().contains(peekedNextArg)) {
strVal = argIter.nextArg(
"The repair algorithm should be specified. The following " +
"values can be used: " + Arrays.toString(RepairAlgorithm.values()) + '.');
Expand All @@ -263,7 +263,7 @@ private ReconciliationResult partitionReconciliationCheck(
break;

case LOCAL_OUTPUT:
console = true;
locOutput = true;

break;

Expand All @@ -277,7 +277,7 @@ private ReconciliationResult partitionReconciliationCheck(
throw new IllegalArgumentException(String.format(PARALLELISM_FORMAT_MESSAGE, strVal));
}

if (parallelism < 0 || parallelism > 128)
if (parallelism < 0)
throw new IllegalArgumentException(String.format(PARALLELISM_FORMAT_MESSAGE, strVal));

break;
Expand Down Expand Up @@ -316,7 +316,7 @@ private ReconciliationResult partitionReconciliationCheck(
strVal = argIter.nextArg("The recheck delay should be specified.");

try {
recheckDelay = Integer.valueOf(strVal);
recheckDelay = Integer.parseInt(strVal);
}
catch (NumberFormatException e) {
throw new IllegalArgumentException(String.format(RECHECK_DELAY_FORMAT_MESSAGE, strVal));
Expand All @@ -330,7 +330,7 @@ private ReconciliationResult partitionReconciliationCheck(
}
}

args = new Arguments(cacheNames, fixMode, verbose, console, parallelism, batchSize, recheckAttempts, repairAlg,
args = new Arguments(cacheNames, repair, verbose, locOutput, parallelism, batchSize, recheckAttempts, repairAlg,
recheckDelay);
}

Expand All @@ -357,16 +357,16 @@ private String prepareHeaderMeta() {
options
.a("caches=[")
.a(args.caches() == null ? "" : String.join(", ", args.caches()))
.a("], fix-mode=[" + args.fixMode)
.a("], verbose=[" + args.verbose)
.a("], repair=[" + args.repair)
.a("], includeSensitive=[" + args.includeSensitive)
.a("], parallelism=[" + args.parallelism)
.a("], batch-size=[" + args.batchSize)
.a("], recheck-attempts=[" + args.recheckAttempts)
.a("], fix-alg=[" + args.repairAlg + "]")
.a("], recheck-delay=[" + args.recheckDelay + "]")
.a(System.lineSeparator());

if (args.verbose) {
if (args.includeSensitive) {
options
.a("WARNING: Please be aware that sensitive data will be printed to the console and output file(s).")
.a(System.lineSeparator());
Expand Down Expand Up @@ -406,15 +406,15 @@ private String prepareResultFolders(
* @param printer Printer.
*/
private void print(ReconciliationResult res, Consumer<String> printer) {
PartitionReconciliationResult reconciliationRes = res.partitionReconciliationResult();
ReconciliationAffectedEntries reconciliationRes = res.partitionReconciliationResult();

printer.accept(prepareHeaderMeta());

printer.accept(prepareErrors(res.errors()));

printer.accept(prepareResultFolders(res.nodeIdToFolder(), reconciliationRes.nodesIdsToConsistenceIdsMap()));

reconciliationRes.print(printer, args.verbose);
reconciliationRes.print(printer, args.includeSensitive);
}

/**
Expand Down Expand Up @@ -446,14 +446,14 @@ protected static class Arguments {
/** List of caches to be checked. */
private final Set<String> caches;

/** Flag indicates that an inconsistency should be fixed in accordance with RepairAlgorithm parameter. */
private final boolean fixMode;
/** Flag indicates that an inconsistency should be fixed in accordance with {@link RepairAlgorithm} parameter. */
private final boolean repair;

/** Flag indicates that the result should include sensitive information such as key and value. */
private final boolean verbose;
private final boolean includeSensitive;

/** Flag indicates that the result is printed to the console. */
private final boolean console;
private final boolean locOutput;

/** Maximum number of threads that can be involved in reconciliation activities. */
private final int parallelism;
Expand All @@ -474,19 +474,30 @@ protected static class Arguments {
* Constructor.
*
* @param caches Caches.
* @param fixMode Fix inconsistency if {@code True}.
* @param verbose Print key and value to result log if {@code True}.
* @param repair Fix inconsistency if {@code true}.
* @param includeSensitive Print key and value to result log if {@code true}.
* @param locOutput Print result to local console.
* @param parallelism Maximum number of threads that can be involved in reconciliation activities.
* @param batchSize Batch size.
* @param recheckAttempts Amount of recheck attempts.
* @param repairAlg Partition reconciliation repair algorithm to be used.
* @param recheckDelay Recheck delay in seconds.
*/
public Arguments(Set<String> caches, boolean fixMode, boolean verbose, boolean console,
public Arguments(
Set<String> caches,
boolean repair,
boolean includeSensitive,
boolean locOutput,
int parallelism,
int batchSize, int recheckAttempts, RepairAlgorithm repairAlg, int recheckDelay) {
int batchSize,
int recheckAttempts,
RepairAlgorithm repairAlg,
int recheckDelay
) {
this.caches = caches;
this.fixMode = fixMode;
this.verbose = verbose;
this.console = console;
this.repair = repair;
this.includeSensitive = includeSensitive;
this.locOutput = locOutput;
this.parallelism = parallelism;
this.batchSize = batchSize;
this.recheckAttempts = recheckAttempts;
Expand All @@ -504,8 +515,8 @@ public Set<String> caches() {
/**
* @return Fix mode.
*/
public boolean fixMode() {
return fixMode;
public boolean repair() {
return repair;
}

/**
Expand All @@ -530,17 +541,17 @@ public int recheckAttempts() {
}

/**
* @return Verbose.
* @return Include sensitive.
*/
public boolean verbose() {
return verbose;
public boolean includeSensitive() {
return includeSensitive;
}

/**
* @return Print to console.
*/
public boolean console() {
return console;
public boolean locOutput() {
return locOutput;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.ignite.internal.client.GridClient;
import org.apache.ignite.internal.client.GridClientConfiguration;
import org.apache.ignite.internal.commandline.Command;
import org.apache.ignite.internal.commandline.CommandArgIterator;
import org.apache.ignite.internal.visor.checker.VisorPartitionReconciliationCancelTask;

import static org.apache.ignite.internal.commandline.TaskExecutor.executeTask;
Expand All @@ -34,7 +33,7 @@
public class PartitionReconciliationCancel implements Command<Void> {
/** {@inheritDoc} */
@Override public void printUsage(Logger log) {
String desc = "Does cancel of partition reconciliation command.";
String desc = "Cancels partition reconciliation command.";

usageCache(log, PARTITION_RECONCILIATION_CANCEL, desc, new HashMap<>());
}
Expand All @@ -50,15 +49,11 @@ public class PartitionReconciliationCancel implements Command<Void> {
}

/** {@inheritDoc} */
@Override public Object execute(GridClientConfiguration clientCfg, Logger log) throws Exception {
@Override public Void execute(GridClientConfiguration clientCfg, Logger log) throws Exception {
try (GridClient client = Command.startClient(clientCfg)) {
executeTask(client, VisorPartitionReconciliationCancelTask.class, null, clientCfg);
}

return null;
}

/** {@inheritDoc} */
@Override public void parseArguments(CommandArgIterator argIter) {
}
}

0 comments on commit b2b4dd5

Please sign in to comment.