Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CASSANDRA-19336 5.0: Ensure that repair doesn't exceed repair_session_space by running repair jobs sequentially #3073

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
805 changes: 516 additions & 289 deletions .circleci/config.yml

Large diffs are not rendered by default.

36 changes: 30 additions & 6 deletions conf/cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -738,18 +738,42 @@ concurrent_materialized_view_writes: 32
# off heap objects
memtable_allocation_type: heap_buffers

# Limit memory usage for Merkle tree calculations during repairs. The default
# is 1/16th of the available heap. The main tradeoff is that smaller trees
# have less resolution, which can lead to over-streaming data. If you see heap
# pressure during repairs, consider lowering this, but you cannot go below
# one mebibyte. If you see lots of over-streaming, consider raising
# this or using subrange repair.
# Limit memory usage for Merkle tree calculations during repairs of a certain
# table and common token range. Repair commands targetting multiple tables or
# virtual nodes can exceed this limit depending on concurrent_merkle_tree_requests.
#
# The default is 1/16th of the available heap. The main tradeoff is that
# smaller trees have less resolution, which can lead to over-streaming data.
# If you see heap pressure during repairs, consider lowering this, but you
# cannot go below one mebibyte. If you see lots of over-streaming, consider
# raising this or using subrange repair.
#
# For more details see https://issues.apache.org/jira/browse/CASSANDRA-14096.
#
# Min unit: MiB
# repair_session_space:

# The number of simultaneous Merkle tree requests during repairs that can
# be performed by a repair command. The size of each validation request is
# limited by the repair_session_space property, so setting this to 1 will make
# sure that a repair command doesn't exceed that limit, even if the repair
# command is repairing multiple tables or multiple virtual nodes.
#
# There isn't a limit by default for backwards compatibility, but this can
# produce OOM for commands repairing multiple tables or multiple virtual nodes.
# A limit of just 1 simultaneous Merkle tree request is generally recommended
# with no virtual nodes so repair_session_space, and thereof the Merkle tree
# resolution, can be high. For virtual nodes a value of 1 with the default
# repair_session_space value will produce higher resolution Merkle trees
# at the expense of speed. Alternatively, when working with virtual nodes it
# can make sense to reduce the repair_session_space and increase the value of
# concurrent_merkle_tree_requests because each range will contain fewer data.
#
# For more details see https://issues.apache.org/jira/browse/CASSANDRA-19336.
#
# A zero value means no limit.
# concurrent_merkle_tree_requests: 0

# repair:
# # Configure the retries for each of the repair messages that support it. As of this moment retries use an exponential algorithm where each attempt sleeps longer based off the base_sleep_time and attempt.
# retries:
Expand Down
2 changes: 2 additions & 0 deletions src/java/org/apache/cassandra/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ public MemtableOptions()
@Replaces(oldName = "repair_session_space_in_mb", converter = Converters.MEBIBYTES_DATA_STORAGE_INT, deprecated = true)
public volatile DataStorageSpec.IntMebibytesBound repair_session_space = null;

public volatile int concurrent_merkle_tree_requests = 0;

public volatile boolean use_offheap_merkle_trees = true;

public int storage_port = 7000;
Expand Down
10 changes: 10 additions & 0 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -3859,6 +3859,16 @@ else if (sizeInMiB > (int) (Runtime.getRuntime().maxMemory() / (4 * 1048576)))
conf.repair_session_space = new DataStorageSpec.IntMebibytesBound(sizeInMiB);
}

public static int getConcurrentMerkleTreeRequests()
{
return conf.concurrent_merkle_tree_requests;
}

public static void setConcurrentMerkleTreeRequests(int value)
{
conf.concurrent_merkle_tree_requests = value;
}

public static int getPaxosRepairParallelism()
{
return conf.paxos_repair_parallelism;
Expand Down
5 changes: 4 additions & 1 deletion src/java/org/apache/cassandra/repair/AbstractRepairTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ protected AbstractRepairTask(RepairCoordinator coordinator)
private List<RepairSession> submitRepairSessions(TimeUUID parentSession,
boolean isIncremental,
ExecutorPlus executor,
Scheduler validationScheduler,
List<CommonRange> commonRanges,
String... cfnames)
{
Expand All @@ -76,6 +77,7 @@ private List<RepairSession> submitRepairSessions(TimeUUID parentSession,
options.repairPaxos(),
options.paxosOnly(),
executor,
validationScheduler,
cfnames);
if (session == null)
continue;
Expand All @@ -88,10 +90,11 @@ private List<RepairSession> submitRepairSessions(TimeUUID parentSession,
protected Future<CoordinatedRepairResult> runRepair(TimeUUID parentSession,
boolean isIncremental,
ExecutorPlus executor,
Scheduler validationScheduler,
List<CommonRange> commonRanges,
String... cfnames)
{
List<RepairSession> allSessions = submitRepairSessions(parentSession, isIncremental, executor, commonRanges, cfnames);
List<RepairSession> allSessions = submitRepairSessions(parentSession, isIncremental, executor, validationScheduler, commonRanges, cfnames);
List<Collection<Range<Token>>> ranges = Lists.transform(allSessions, RepairSession::ranges);
Future<List<RepairSessionResult>> f = FutureCombiner.successfulOf(allSessions);
return f.map(results -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public String name()
}

@Override
public Future<CoordinatedRepairResult> performUnsafe(ExecutorPlus executor) throws Exception
public Future<CoordinatedRepairResult> performUnsafe(ExecutorPlus executor, Scheduler validationScheduler) throws Exception
{
// the local node also needs to be included in the set of participants, since coordinator sessions aren't persisted
Set<InetAddressAndPort> allParticipants = ImmutableSet.<InetAddressAndPort>builder()
Expand All @@ -64,7 +64,7 @@ public Future<CoordinatedRepairResult> performUnsafe(ExecutorPlus executor) thro

CoordinatorSession coordinatorSession = coordinator.ctx.repair().consistent.coordinated.registerSession(parentSession, allParticipants, neighborsAndRanges.shouldExcludeDeadParticipants);

return coordinatorSession.execute(() -> runRepair(parentSession, true, executor, allRanges, cfnames));
return coordinatorSession.execute(() -> runRepair(parentSession, true, executor, validationScheduler, allRanges, cfnames));

}
}
4 changes: 2 additions & 2 deletions src/java/org/apache/cassandra/repair/NormalRepairTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ public String name()
}

@Override
public Future<CoordinatedRepairResult> performUnsafe(ExecutorPlus executor)
public Future<CoordinatedRepairResult> performUnsafe(ExecutorPlus executor, Scheduler validationScheduler)
{
return runRepair(parentSession, false, executor, commonRanges, cfnames);
return runRepair(parentSession, false, executor, validationScheduler, commonRanges, cfnames);
}
}
4 changes: 2 additions & 2 deletions src/java/org/apache/cassandra/repair/PreviewRepairTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ public String successMessage()
}

@Override
public Future<CoordinatedRepairResult> performUnsafe(ExecutorPlus executor)
public Future<CoordinatedRepairResult> performUnsafe(ExecutorPlus executor, Scheduler validationScheduler)
{
Future<CoordinatedRepairResult> f = runRepair(parentSession, false, executor, commonRanges, cfnames);
Future<CoordinatedRepairResult> f = runRepair(parentSession, false, executor, validationScheduler, commonRanges, cfnames);
return f.map(result -> {
if (result.hasFailed())
return result;
Expand Down
4 changes: 3 additions & 1 deletion src/java/org/apache/cassandra/repair/RepairCoordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public class RepairCoordinator implements Runnable, ProgressEventNotifier, Repai
private final List<ProgressListener> listeners = new ArrayList<>();
private final AtomicReference<Throwable> firstError = new AtomicReference<>(null);
final SharedContext ctx;
final Scheduler validationScheduler;

private TraceState traceState;

Expand All @@ -116,6 +117,7 @@ public RepairCoordinator(StorageService storageService, int cmd, RepairOption op
int cmd, RepairOption options, String keyspace)
{
this.ctx = ctx;
this.validationScheduler = Scheduler.build(DatabaseDescriptor.getConcurrentMerkleTreeRequests());
this.state = new CoordinatorState(ctx.clock(), cmd, keyspace, options);
this.tag = "repair:" + cmd;
this.validColumnFamilies = validColumnFamilies;
Expand Down Expand Up @@ -453,7 +455,7 @@ else if (state.options.isIncremental())

ExecutorPlus executor = createExecutor();
state.phase.repairSubmitted();
return task.perform(executor)
return task.perform(executor, validationScheduler)
// after adding the callback java could no longer infer the type...
.<Pair<CoordinatedRepairResult, Supplier<String>>>map(r -> Pair.create(r, task::successMessage))
.addCallback((s, f) -> executor.shutdown());
Expand Down