Skip to content
14 changes: 7 additions & 7 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,16 @@ with_dtest_jobs_only: &with_dtest_jobs_only
- build
# Set env_settings, env_vars, and workflows/build_and_run_tests based on environment
env_settings: &env_settings
<<: *default_env_settings
#<<: *high_capacity_env_settings
# <<: *default_env_settings
<<: *high_capacity_env_settings
env_vars: &env_vars
<<: *resource_constrained_env_vars
#<<: *high_capacity_env_vars
# <<: *resource_constrained_env_vars
<<: *high_capacity_env_vars
workflows:
version: 2
build_and_run_tests: *default_jobs
#build_and_run_tests: *default_jobs
#build_and_run_tests: *with_dtest_jobs_only
#build_and_run_tests: *with_dtest_jobs
build_and_run_tests: *with_dtest_jobs
docker_image: &docker_image kjellman/cassandra-test:0.4.3
version: 2
jobs:
Expand Down Expand Up @@ -206,7 +206,7 @@ jobs:
name: Clone Cassandra dtest Repository (via git)
command: |
export LANG=en_US.UTF-8
git clone --single-branch --branch master --depth 1 git://github.com/apache/cassandra-dtest.git ~/cassandra-dtest
git clone --single-branch --branch 14409 --depth 1 git://github.com/ifesdjeen/cassandra-dtest.git ~/cassandra-dtest
- run:
name: Configure virtualenv and python Dependencies
command: |
Expand Down
21 changes: 6 additions & 15 deletions src/java/org/apache/cassandra/db/DiskBoundaryManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,17 +109,7 @@ private static DiskBoundaries getDiskBoundaryValue(ColumnFamilyStore cfs)
if (localRanges == null || localRanges.isEmpty())
return new DiskBoundaries(dirs, null, ringVersion, directoriesVersion);

// note that Range.sort unwraps any wraparound ranges, so we need to sort them here
List<Range<Token>> fullLocalRanges = Range.sort(localRanges.stream()
.filter(Replica::isFull)
.map(Replica::range)
.collect(Collectors.toList()));
List<Range<Token>> transientLocalRanges = Range.sort(localRanges.stream()
.filter(Replica::isTransient)
.map(Replica::range)
.collect(Collectors.toList()));

List<PartitionPosition> positions = getDiskBoundaries(fullLocalRanges, transientLocalRanges, cfs.getPartitioner(), dirs);
List<PartitionPosition> positions = getDiskBoundaries(localRanges, cfs.getPartitioner(), dirs);

return new DiskBoundaries(dirs, positions, ringVersion, directoriesVersion);
}
Expand All @@ -133,18 +123,19 @@ private static DiskBoundaries getDiskBoundaryValue(ColumnFamilyStore cfs)
*
* The final entry in the returned list will always be the partitioner maximum tokens upper key bound
*/
private static List<PartitionPosition> getDiskBoundaries(List<Range<Token>> fullRanges, List<Range<Token>> transientRanges, IPartitioner partitioner, Directories.DataDirectory[] dataDirectories)
private static List<PartitionPosition> getDiskBoundaries(RangesAtEndpoint ranges, IPartitioner partitioner, Directories.DataDirectory[] dataDirectories)
{
assert partitioner.splitter().isPresent();

Splitter splitter = partitioner.splitter().get();
boolean dontSplitRanges = DatabaseDescriptor.getNumTokens() > 1;

List<Splitter.WeightedRange> weightedRanges = new ArrayList<>(fullRanges.size() + transientRanges.size());
for (Range<Token> r : fullRanges)
List<Splitter.WeightedRange> weightedRanges = new ArrayList<>(ranges.size());
// note that Range.sort unwraps any wraparound ranges, so we need to sort them here
for (Range<Token> r : Range.sort(ranges.fullRanges()))
weightedRanges.add(new Splitter.WeightedRange(1.0, r));

for (Range<Token> r : transientRanges)
for (Range<Token> r : Range.sort(ranges.transientRanges()))
weightedRanges.add(new Splitter.WeightedRange(0.1, r));

weightedRanges.sort(Comparator.comparing(Splitter.WeightedRange::left));
Expand Down
1 change: 1 addition & 0 deletions src/java/org/apache/cassandra/repair/AbstractSyncTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@
public abstract class AbstractSyncTask extends AbstractFuture<SyncStat> implements Runnable
{
protected abstract void startSync(List<Range<Token>> rangesToStream);
public abstract NodePair nodePair();
}
105 changes: 0 additions & 105 deletions src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java

This file was deleted.

13 changes: 11 additions & 2 deletions src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.concurrent.TimeUnit;

import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -39,21 +40,29 @@ public abstract class AsymmetricSyncTask extends AbstractSyncTask
protected final List<Range<Token>> rangesToFetch;
protected final InetAddressAndPort fetchingNode;
protected final PreviewKind previewKind;
protected final NodePair nodePair;

private long startTime = Long.MIN_VALUE;
protected volatile SyncStat stat;

public AsymmetricSyncTask(RepairJobDesc desc, InetAddressAndPort fetchingNode, InetAddressAndPort fetchFrom, List<Range<Token>> rangesToFetch, PreviewKind previewKind)
{
assert !fetchFrom.equals(fetchingNode) : "Fetching from self " + fetchFrom;
Preconditions.checkArgument(!fetchFrom.equals(fetchingNode), "Sending and receiving node are the same: %s", fetchFrom);
this.desc = desc;
this.fetchFrom = fetchFrom;
this.fetchingNode = fetchingNode;
this.rangesToFetch = rangesToFetch;
this.nodePair = new NodePair(fetchingNode, fetchFrom);
// todo: make an AsymmetricSyncStat?
stat = new SyncStat(new NodePair(fetchingNode, fetchFrom), rangesToFetch.size());
stat = new SyncStat(nodePair, rangesToFetch.size());
this.previewKind = previewKind;
}

public NodePair nodePair()
{
return nodePair;
}

public void run()
{
startTime = System.currentTimeMillis();
Expand Down
8 changes: 7 additions & 1 deletion src/java/org/apache/cassandra/repair/CommonRange.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,13 @@ public CommonRange(Set<InetAddressAndPort> endpoints, Set<InetAddressAndPort> tr

this.endpoints = ImmutableSet.copyOf(endpoints);
this.transEndpoints = ImmutableSet.copyOf(transEndpoints);
this.ranges = new ArrayList(ranges);
this.ranges = new ArrayList<>(ranges);
}

public boolean matchesEndpoints(Set<InetAddressAndPort> endpoints, Set<InetAddressAndPort> transEndpoints)
{
// Use strict equality here, as worst thing that can happen is we generate one more stream
return this.endpoints.equals(endpoints) && this.transEndpoints.equals(transEndpoints);
}

public boolean equals(Object o)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.UUID;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -39,43 +40,60 @@
import org.apache.cassandra.tracing.TraceState;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MerkleTrees;

/**
* SymmetricLocalSyncTask performs streaming between local(coordinator) node and remote replica.
* LocalSyncTask performs streaming between local(coordinator) node and remote replica.
*/
public class SymmetricLocalSyncTask extends SymmetricSyncTask implements StreamEventHandler
public class LocalSyncTask extends SymmetricSyncTask implements StreamEventHandler
{
private final TraceState state = Tracing.instance.get();

private static final Logger logger = LoggerFactory.getLogger(SymmetricLocalSyncTask.class);
private static final Logger logger = LoggerFactory.getLogger(LocalSyncTask.class);

private final boolean remoteIsTransient;
private final UUID pendingRepair;
private final boolean pullRepair;
private final boolean requestRanges;
private final boolean transferRanges;

public SymmetricLocalSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, boolean remoteIsTransient, UUID pendingRepair, boolean pullRepair, PreviewKind previewKind)
public LocalSyncTask(RepairJobDesc desc, TreeResponse local, TreeResponse remote, UUID pendingRepair,
boolean requestRanges, boolean transferRanges, PreviewKind previewKind)
{
super(desc, r1, r2, previewKind);
this.remoteIsTransient = remoteIsTransient;
this(desc, local.endpoint, remote.endpoint, MerkleTrees.difference(local.trees, remote.trees),
pendingRepair, requestRanges, transferRanges, previewKind);
}

public LocalSyncTask(RepairJobDesc desc, InetAddressAndPort local, InetAddressAndPort remote,
List<Range<Token>> diff, UUID pendingRepair,
boolean requestRanges, boolean transferRanges, PreviewKind previewKind)
{
super(desc, local, remote, diff, previewKind);
Preconditions.checkArgument(requestRanges || transferRanges, "Nothing to do in a sync job");
Preconditions.checkArgument(local.equals(FBUtilities.getBroadcastAddressAndPort()));

this.pendingRepair = pendingRepair;
this.pullRepair = pullRepair;
this.requestRanges = requestRanges;
this.transferRanges = transferRanges;
}

@VisibleForTesting
StreamPlan createStreamPlan(InetAddressAndPort dst, List<Range<Token>> differences)
StreamPlan createStreamPlan(InetAddressAndPort remote, List<Range<Token>> differences)
{
StreamPlan plan = new StreamPlan(StreamOperation.REPAIR, 1, false, pendingRepair, previewKind)
.listeners(this)
.flushBeforeTransfer(pendingRepair == null)
// see comment on RangesAtEndpoint.toDummyList for why we synthesize replicas here
.requestRanges(dst, desc.keyspace, RangesAtEndpoint.toDummyList(differences),
RangesAtEndpoint.toDummyList(Collections.emptyList()), desc.columnFamily); // request ranges from the remote node
.flushBeforeTransfer(pendingRepair == null);

if (requestRanges)
{
// see comment on RangesAtEndpoint.toDummyList for why we synthesize replicas here
plan.requestRanges(remote, desc.keyspace, RangesAtEndpoint.toDummyList(differences),
RangesAtEndpoint.toDummyList(Collections.emptyList()), desc.columnFamily);
}

if (!pullRepair && !remoteIsTransient)
if (transferRanges)
{
// send ranges to the remote node if we are not performing a pull repair
// see comment on RangesAtEndpoint.toDummyList for why we synthesize replicas here
plan.transferRanges(dst, desc.keyspace, RangesAtEndpoint.toDummyList(differences), desc.columnFamily);
plan.transferRanges(remote, desc.keyspace, RangesAtEndpoint.toDummyList(differences), desc.columnFamily);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the idiom where it returns this isn't always used to return "this" some of the time it's a copy. Granted here it is returning this, but should you assign the plan field on the stack again?

I don't feel strongly about it just noticing that.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I felt like this is a side-effect generating, imperative method, so ignoring its return is fine as it's done in order to facilitate chaining.

}

return plan;
Expand All @@ -88,15 +106,13 @@ StreamPlan createStreamPlan(InetAddressAndPort dst, List<Range<Token>> differenc
@Override
protected void startSync(List<Range<Token>> differences)
{
InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
// We can take anyone of the node as source or destination, however if one is localhost, we put at source to avoid a forwarding
InetAddressAndPort dst = r2.endpoint.equals(local) ? r1.endpoint : r2.endpoint;
InetAddressAndPort remote = endpoint2;

String message = String.format("Performing streaming repair of %d ranges with %s", differences.size(), dst);
String message = String.format("Performing streaming repair of %d ranges with %s", differences.size(), remote);
logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
Tracing.traceRepair(message);

createStreamPlan(dst, differences).execute();
createStreamPlan(remote, differences).execute();
}

public void handleStreamEvent(StreamEvent event)
Expand Down Expand Up @@ -127,7 +143,7 @@ public void handleStreamEvent(StreamEvent event)

public void onSuccess(StreamState result)
{
String message = String.format("Sync complete using session %s between %s and %s on %s", desc.sessionId, r1.endpoint, r2.endpoint, desc.columnFamily);
String message = String.format("Sync complete using session %s between %s and %s on %s", desc.sessionId, endpoint1, endpoint2, desc.columnFamily);
logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
Tracing.traceRepair(message);
set(stat.withSummaries(result.createSummaries()));
Expand Down
Loading