Skip to content

Commit

Permalink
Clean up repair code
Browse files Browse the repository at this point in the history
patch by Simon Zhou; reviewed by Ekaterina Dimitrova and Andrés de la Peña for CASSANDRA-13720
  • Loading branch information
simon-zhou authored and ekaterinadimitrova2 committed Aug 18, 2021
1 parent 5b325b8 commit fd3eb4f
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
@@ -1,4 +1,5 @@
4.1
* Clean up repair code (CASSANDRA-13720)
* Background schedule to clean up orphaned hints files (CASSANDRA-16815)
* Modify SecondaryIndexManager#indexPartition() to retrieve only columns for which indexes are actually being built (CASSANDRA-16776)
* Batch the token metadata update to improve the speed (CASSANDRA-15291)
Expand Down
18 changes: 4 additions & 14 deletions src/java/org/apache/cassandra/repair/RepairRunnable.java
Expand Up @@ -309,7 +309,7 @@ private TraceState maybeCreateTraceState(Iterable<ColumnFamilyStore> columnFamil
traceState.enableActivityNotification(tag);
for (ProgressListener listener : listeners)
traceState.addProgressListener(listener);
Thread queryThread = createQueryThread(cmd, sessionId);
Thread queryThread = createQueryThread(sessionId);
queryThread.setName("RepairTracePolling");
queryThread.start();
return traceState;
Expand Down Expand Up @@ -412,15 +412,13 @@ private void repair(String[] cfnames, NeighborsAndRanges neighborsAndRanges)
if (options.isPreview())
{
previewRepair(parentSession,
creationTimeMillis,
neighborsAndRanges.filterCommonRanges(keyspace, cfnames),
neighborsAndRanges.participants,
cfnames);
}
else if (options.isIncremental())
{
incrementalRepair(parentSession,
creationTimeMillis,
traceState,
neighborsAndRanges,
neighborsAndRanges.participants,
Expand All @@ -429,7 +427,6 @@ else if (options.isIncremental())
else
{
normalRepair(parentSession,
creationTimeMillis,
traceState,
neighborsAndRanges.filterCommonRanges(keyspace, cfnames),
neighborsAndRanges.participants,
Expand All @@ -439,7 +436,6 @@ else if (options.isIncremental())

@SuppressWarnings("UnstableApiUsage")
private void normalRepair(UUID parentSession,
long startTime,
TraceState traceState,
List<CommonRange> commonRanges,
Set<InetAddressAndPort> preparedEndpoints,
Expand Down Expand Up @@ -485,15 +481,13 @@ public ListenableFuture apply(List<RepairSessionResult> results)
new RepairCompleteCallback(parentSession,
successfulRanges,
preparedEndpoints,
startTime,
traceState,
hasFailure,
executor),
MoreExecutors.directExecutor());
}

private void incrementalRepair(UUID parentSession,
long startTime,
TraceState traceState,
NeighborsAndRanges neighborsAndRanges,
Set<InetAddressAndPort> preparedEndpoints,
Expand Down Expand Up @@ -528,12 +522,11 @@ private void incrementalRepair(UUID parentSession,
ranges.addAll(range);
}
Futures.addCallback(repairResult,
new RepairCompleteCallback(parentSession, ranges, preparedEndpoints, startTime, traceState, hasFailure, executor),
new RepairCompleteCallback(parentSession, ranges, preparedEndpoints, traceState, hasFailure, executor),
MoreExecutors.directExecutor());
}

private void previewRepair(UUID parentSession,
long startTime,
List<CommonRange> commonRanges,
Set<InetAddressAndPort> preparedEndpoints,
String... cfnames)
Expand Down Expand Up @@ -569,7 +562,7 @@ public void onSuccess(List<RepairSessionResult> results)
}
else
{
message = (previewKind == PreviewKind.REPAIRED ? "Repaired data is inconsistent\n" : "Preview complete\n") + summary.toString();
message = (previewKind == PreviewKind.REPAIRED ? "Repaired data is inconsistent\n" : "Preview complete\n") + summary;
RepairMetrics.previewFailures.inc();
if (previewKind == PreviewKind.REPAIRED)
maybeSnapshotReplicas(parentSession, keyspace, results);
Expand Down Expand Up @@ -727,23 +720,20 @@ private class RepairCompleteCallback implements FutureCallback<Object>
final UUID parentSession;
final Collection<Range<Token>> successfulRanges;
final Set<InetAddressAndPort> preparedEndpoints;
final long startTime;
final TraceState traceState;
final AtomicBoolean hasFailure;
final ExecutorService executor;

public RepairCompleteCallback(UUID parentSession,
Collection<Range<Token>> successfulRanges,
Set<InetAddressAndPort> preparedEndpoints,
long startTime,
TraceState traceState,
AtomicBoolean hasFailure,
ExecutorService executor)
{
this.parentSession = parentSession;
this.successfulRanges = successfulRanges;
this.preparedEndpoints = preparedEndpoints;
this.startTime = startTime;
this.traceState = traceState;
this.hasFailure = hasFailure;
this.executor = executor;
Expand Down Expand Up @@ -791,7 +781,7 @@ private static void addRangeToNeighbors(List<CommonRange> neighborRangeList, Ran
neighborRangeList.add(new CommonRange(endpoints, transEndpoints, ranges));
}

private Thread createQueryThread(final int cmd, final UUID sessionId)
private Thread createQueryThread(final UUID sessionId)
{
return NamedThreadFactory.createThread(new WrappedRunnable()
{
Expand Down

0 comments on commit fd3eb4f

Please sign in to comment.