Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
adelapena committed Feb 6, 2024
1 parent 8401480 commit 79703b9
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public RepairCoordinator(StorageService storageService, int cmd, RepairOption op
int cmd, RepairOption options, String keyspace)
{
this.ctx = ctx;
this.validationScheduler = Scheduler.build(DatabaseDescriptor.getConcurrentMerkleTreeRequests(), ctx);
this.validationScheduler = Scheduler.build(DatabaseDescriptor.getConcurrentMerkleTreeRequests());
this.state = new CoordinatorState(ctx.clock(), cmd, keyspace, options);
this.tag = "repair:" + cmd;
this.validColumnFamilies = validColumnFamilies;
Expand Down
43 changes: 11 additions & 32 deletions src/java/org/apache/cassandra/repair/RepairJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.function.Predicate;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;


import javax.annotation.Nullable;

import com.google.common.annotations.VisibleForTesting;
Expand All @@ -42,16 +40,16 @@
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.repair.asymmetric.DifferenceHolder;
import org.apache.cassandra.repair.asymmetric.HostDifferences;
import org.apache.cassandra.repair.asymmetric.PreferedNodeFilter;
import org.apache.cassandra.repair.asymmetric.ReduceHelper;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.schema.SystemDistributedKeyspace;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.service.paxos.cleanup.PaxosCleanup;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MerkleTrees;
Expand Down Expand Up @@ -141,7 +139,7 @@ public void run()

if (session.paxosOnly)
{
paxosRepair.addCallback(new FutureCallback<Void>()
paxosRepair.addCallback(new FutureCallback<>()
{
public void onSuccess(Void v)
{
Expand Down Expand Up @@ -195,23 +193,9 @@ public void onFailure(Throwable t)
}

// Run validations and the creation of sync tasks in the scheduler, so it can limit the number of Merkle trees
Scheduler.Task<List<SyncTask>> syncTasks = new Scheduler.Task<>()
{
@Override
public void run()
{
createSyncTasks(paxosRepair, allSnapshotTasks, allEndpoints).addCallback((s, f) -> {
if (f != null)
tryFailure(f);
else
trySuccess(s);
});
}
};
session.validationScheduler.schedule(session.getId(), taskExecutor, syncTasks);

// When all validations complete, submit sync tasks
Future<List<SyncStat>> syncResults = syncTasks.flatMap(this::executeTasks, taskExecutor);
// that there are in memory at once. When all validations complete, submit sync tasks out of the scheduler.
Future<List<SyncStat>> syncResults = session.validationScheduler.schedule(() -> createSyncTasks(paxosRepair, allSnapshotTasks, allEndpoints), taskExecutor)
.flatMap(this::executeTasks, taskExecutor);

// When all sync complete, set the final result
syncResults.addCallback(new FutureCallback<>()
Expand Down Expand Up @@ -583,12 +567,7 @@ private Future<List<TreeResponse>> sendDCAwareValidationRequest(Collection<InetA
for (InetAddressAndPort endpoint : endpoints)
{
String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(endpoint);
Queue<InetAddressAndPort> queue = requestsByDatacenter.get(dc);
if (queue == null)
{
queue = new LinkedList<>();
requestsByDatacenter.put(dc, queue);
}
Queue<InetAddressAndPort> queue = requestsByDatacenter.computeIfAbsent(dc, k -> new LinkedList<>());
queue.add(endpoint);
}

Expand All @@ -606,7 +585,7 @@ private Future<List<TreeResponse>> sendDCAwareValidationRequest(Collection<InetA
final InetAddressAndPort nextAddress = requests.poll();
final ValidationTask nextTask = newValidationTask(nextAddress, nowInSec);
tasks.add(nextTask);
currentTask.addCallback(new FutureCallback<TreeResponse>()
currentTask.addCallback(new FutureCallback<>()
{
public void onSuccess(TreeResponse result)
{
Expand Down
155 changes: 46 additions & 109 deletions src/java/org/apache/cassandra/repair/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,164 +18,101 @@

package org.apache.cassandra.repair;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.annotation.concurrent.GuardedBy;

import org.agrona.collections.LongArrayList;
import org.apache.cassandra.utils.TimeUUID;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.AsyncFuture;
import org.apache.cassandra.utils.concurrent.Future;

/**
* Task scheduler that limits the number of concurrent tasks across multiple executors.
*/
public interface Scheduler
{
void schedule(TimeUUID sessionId, Executor taskExecutor, Task<?> tasks);
default <T> Future<T> schedule(Supplier<Future<T>> task, Executor executor)
{
return schedule(new Task<>(task), executor);
}

static Scheduler build(int concurrentValidations, SharedContext ctx)
<T> Task<T> schedule(Task<T> task, Executor executor);

static Scheduler build(int concurrentValidations)
{
return concurrentValidations <= 0
? new NoopScheduler()
: new LimitedConcurrentScheduler(concurrentValidations, ctx);
: new LimitedConcurrentScheduler(concurrentValidations);
}

final class NoopScheduler implements Scheduler
{
@Override
public void schedule(TimeUUID sessionId, Executor taskExecutor, Task<?> tasks)
public <T> Task<T> schedule(Task<T> task, Executor executor)
{
taskExecutor.execute(tasks);
executor.execute(task);
return task;
}
}

final class LimitedConcurrentScheduler implements Scheduler
{
private final int concurrentValidations;
private final SharedContext ctx;
@GuardedBy("this")
private int inflight = 0;
@GuardedBy("this")
private final Map<TimeUUID, Group> groups = new HashMap<>();
private final Queue<Pair<Task<?>, Executor>> tasks = new LinkedList<>();

LimitedConcurrentScheduler(int concurrentValidations, SharedContext ctx)
LimitedConcurrentScheduler(int concurrentValidations)
{
this.concurrentValidations = concurrentValidations;
this.ctx = ctx;
}

@Override
public synchronized void schedule(TimeUUID sessionId, Executor taskExecutor, Task<?> tasks)
public synchronized <T> Task<T> schedule(Task<T> task, Executor executor)
{
groups.computeIfAbsent(sessionId, ignore -> new Group(sessionId, taskExecutor)).add(tasks);
tasks.offer(Pair.create(task, executor));
maybeSchedule();
return task;
}

private synchronized void onDone(Group group, long durationNs)
private synchronized void onDone()
{
group.update(durationNs);
inflight--;
maybeSchedule();
}

private void maybeSchedule()
{
if (inflight == concurrentValidations)
return;
Group smallest = null;
long smallestScore = -1;
for (var g : groups.values())
{
if (g.isEmpty())
continue;
if (smallest == null)
{
smallest = g;
smallestScore = g.score();
}
else
{
var score = g.score();
if (score < smallestScore)
{
smallest = g;
smallestScore = score;
}
}
}
if (smallest == null)
if (inflight == concurrentValidations || tasks.isEmpty())
return;
inflight++;
smallest.executeNext();
Pair<Task<?>, Executor> pair = tasks.poll();
pair.left.addCallback((s, f) -> onDone());
pair.right.execute(pair.left);
}
}

private class Group
{
private final TimeUUID sessionId;
private final Executor taskExecutor;
private final List<Task<?>> tasks = new ArrayList<>();
private final LongArrayList durations = new LongArrayList();
private int inflight = 0;
private int completed = 0;

private Group(TimeUUID sessionId, Executor taskExecutor)
{
this.sessionId = sessionId;
this.taskExecutor = taskExecutor;
}

public long score()
{
if (tasks.isEmpty())
return -1;
long avgDuration = (long) durations.longStream().average().orElse(TimeUnit.HOURS.toNanos(1));
return tasks.size() * avgDuration;
}

public void executeNext()
{
Task<?> task = tasks.get(0);
tasks.remove(0);
inflight++;
var startNs = ctx.clock().nanoTime();
task.addCallback((s, f) -> onDone(this, ctx.clock().nanoTime() - startNs));
taskExecutor.execute(task);
}

public void add(Task<?> task)
{
tasks.add(task);
}

private void update(long durationNs)
{
durations.add(durationNs);
inflight--;
completed++;
}

public boolean isEmpty()
{
return tasks.isEmpty();
}
class Task<T> extends AsyncFuture<T> implements Runnable
{
private final Supplier<Future<T>> supplier;

@Override
public String toString()
{
return "Group{" +
"sessionId=" + sessionId +
", tasks=" + tasks.size() +
", durations=" + durations.longStream().average().orElse(-1) +
", score=" + score() +
", inflight=" + inflight +
", completed=" + completed +
'}';
}
public Task(Supplier<Future<T>> supplier)
{
this.supplier = supplier;
}
}

abstract class Task<T> extends AsyncFuture<T> implements Runnable
{
@Override
public void run()
{
supplier.get().addCallback((s, f) -> {
if (f != null)
tryFailure(f);
else
trySuccess(s);
});
}
}
}
2 changes: 1 addition & 1 deletion test/unit/org/apache/cassandra/repair/RepairJobTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ public void testValidationFailure() throws InterruptedException, TimeoutExceptio
}
catch (ExecutionException e)
{
Assertions.assertThat(e.getCause()).isInstanceOf(RepairException.class);
Assertions.assertThat(e).hasRootCauseInstanceOf(RepairException.class);
}

// When the job fails, all three outstanding validation tasks should be aborted.
Expand Down

0 comments on commit 79703b9

Please sign in to comment.