From aadc3c6dcd65ef915c1743cb133f1065ee2618c3 Mon Sep 17 00:00:00 2001 From: Alex Petrov Date: Tue, 4 Sep 2018 19:38:27 +0200 Subject: [PATCH 01/13] Imrpve DiskBoundaryManager, bring it into consistency with the rest of classes --- .../cassandra/db/DiskBoundaryManager.java | 21 ++++++------------- 1 file changed, 6 insertions(+), 15 deletions(-) diff --git a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java index acfe71aeae40..0961a4275f56 100644 --- a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java +++ b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java @@ -109,17 +109,7 @@ private static DiskBoundaries getDiskBoundaryValue(ColumnFamilyStore cfs) if (localRanges == null || localRanges.isEmpty()) return new DiskBoundaries(dirs, null, ringVersion, directoriesVersion); - // note that Range.sort unwraps any wraparound ranges, so we need to sort them here - List> fullLocalRanges = Range.sort(localRanges.stream() - .filter(Replica::isFull) - .map(Replica::range) - .collect(Collectors.toList())); - List> transientLocalRanges = Range.sort(localRanges.stream() - .filter(Replica::isTransient) - .map(Replica::range) - .collect(Collectors.toList())); - - List positions = getDiskBoundaries(fullLocalRanges, transientLocalRanges, cfs.getPartitioner(), dirs); + List positions = getDiskBoundaries(localRanges, cfs.getPartitioner(), dirs); return new DiskBoundaries(dirs, positions, ringVersion, directoriesVersion); } @@ -133,18 +123,19 @@ private static DiskBoundaries getDiskBoundaryValue(ColumnFamilyStore cfs) * * The final entry in the returned list will always be the partitioner maximum tokens upper key bound */ - private static List getDiskBoundaries(List> fullRanges, List> transientRanges, IPartitioner partitioner, Directories.DataDirectory[] dataDirectories) + private static List getDiskBoundaries(RangesAtEndpoint ranges, IPartitioner partitioner, Directories.DataDirectory[] dataDirectories) { assert partitioner.splitter().isPresent(); Splitter splitter = partitioner.splitter().get(); boolean dontSplitRanges = DatabaseDescriptor.getNumTokens() > 1; - List weightedRanges = new ArrayList<>(fullRanges.size() + transientRanges.size()); - for (Range r : fullRanges) + List weightedRanges = new ArrayList<>(ranges.size()); + // note that Range.sort unwraps any wraparound ranges, so we need to sort them here + for (Range r : Range.sort(ranges.fullRanges())) weightedRanges.add(new Splitter.WeightedRange(1.0, r)); - for (Range r : transientRanges) + for (Range r : Range.sort(ranges.transientRanges())) weightedRanges.add(new Splitter.WeightedRange(0.1, r)); weightedRanges.sort(Comparator.comparing(Splitter.WeightedRange::left)); From c1a73a5eb799ee4d12a580af848d72eda21fd7b2 Mon Sep 17 00:00:00 2001 From: Alex Petrov Date: Tue, 4 Sep 2018 19:39:19 +0200 Subject: [PATCH 02/13] Improve repair scheduling --- .../cassandra/repair/AbstractSyncTask.java | 1 + .../cassandra/repair/AsymmetricSyncTask.java | 13 +- .../apache/cassandra/repair/RepairJob.java | 161 +++++++++--------- .../cassandra/repair/RepairRunnable.java | 26 +-- .../cassandra/repair/SymmetricSyncTask.java | 10 +- .../service/ActiveRepairService.java | 39 +++-- 6 files changed, 140 insertions(+), 110 deletions(-) diff --git a/src/java/org/apache/cassandra/repair/AbstractSyncTask.java b/src/java/org/apache/cassandra/repair/AbstractSyncTask.java index 124baa17a5d4..b11d93b114a7 100644 --- a/src/java/org/apache/cassandra/repair/AbstractSyncTask.java +++ b/src/java/org/apache/cassandra/repair/AbstractSyncTask.java @@ -28,4 +28,5 @@ public abstract class AbstractSyncTask extends AbstractFuture implements Runnable { protected abstract void startSync(List> rangesToStream); + public abstract NodePair nodePair(); } diff --git a/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java index 35474afbadbf..c1800561358a 100644 --- a/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java +++ b/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java @@ -39,6 +39,8 @@ public abstract class AsymmetricSyncTask extends AbstractSyncTask protected final List> rangesToFetch; protected final InetAddressAndPort fetchingNode; protected final PreviewKind previewKind; + protected final NodePair nodePair; + private long startTime = Long.MIN_VALUE; protected volatile SyncStat stat; @@ -49,9 +51,16 @@ public AsymmetricSyncTask(RepairJobDesc desc, InetAddressAndPort fetchingNode, I this.fetchFrom = fetchFrom; this.fetchingNode = fetchingNode; this.rangesToFetch = rangesToFetch; + this.nodePair = new NodePair(fetchingNode, fetchFrom); // todo: make an AsymmetricSyncStat? - stat = new SyncStat(new NodePair(fetchingNode, fetchFrom), rangesToFetch.size()); + stat = new SyncStat(nodePair, rangesToFetch.size()); this.previewKind = previewKind; + + } + + public NodePair nodePair() + { + return nodePair; } public void run() @@ -78,4 +87,6 @@ protected void finished() if (startTime != Long.MIN_VALUE) Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).metric.syncTime.update(System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS); } + + } diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java index d38435b65744..1ba7349feda9 100644 --- a/src/java/org/apache/cassandra/repair/RepairJob.java +++ b/src/java/org/apache/cassandra/repair/RepairJob.java @@ -128,7 +128,7 @@ public ListenableFuture> apply(List endpo } // When all validations complete, submit sync tasks - ListenableFuture> syncResults = Futures.transformAsync(validations, optimiseStreams && !session.pullRepair ? optimisedSyncing() : standardSyncing(), taskExecutor); + ListenableFuture> syncResults = Futures.transformAsync(validations, optimiseStreams && !session.pullRepair ? this::optimisedSyncing : this::standardSyncing, taskExecutor); // When all sync complete, set the final result Futures.addCallback(syncResults, new FutureCallback>() @@ -165,107 +165,104 @@ private boolean isTransient(InetAddressAndPort ep) return session.commonRange.transEndpoints.contains(ep); } - private AsyncFunction, List> standardSyncing() + private ListenableFuture> standardSyncing(List trees) { - return trees -> - { - InetAddressAndPort local = FBUtilities.getLocalAddressAndPort(); + InetAddressAndPort local = FBUtilities.getLocalAddressAndPort(); - List syncTasks = new ArrayList<>(); - // We need to difference all trees one against another - for (int i = 0; i < trees.size() - 1; ++i) + List syncTasks = new ArrayList<>(); + // We need to difference all trees one against another + for (int i = 0; i < trees.size() - 1; ++i) + { + TreeResponse r1 = trees.get(i); + for (int j = i + 1; j < trees.size(); ++j) { - TreeResponse r1 = trees.get(i); - for (int j = i + 1; j < trees.size(); ++j) - { - TreeResponse r2 = trees.get(j); + TreeResponse r2 = trees.get(j); - if (isTransient(r1.endpoint) && isTransient(r2.endpoint)) - continue; + if (isTransient(r1.endpoint) && isTransient(r2.endpoint)) + continue; - AbstractSyncTask task; - if (r1.endpoint.equals(local) || r2.endpoint.equals(local)) - { - InetAddressAndPort remote = r1.endpoint.equals(local) ? r2.endpoint : r1.endpoint; - task = new SymmetricLocalSyncTask(desc, r1, r2, isTransient(remote), isIncremental ? desc.parentSessionId : null, session.pullRepair, session.previewKind); - } - else if (isTransient(r1.endpoint) || isTransient(r2.endpoint)) - { - TreeResponse streamFrom = isTransient(r1.endpoint) ? r1 : r2; - TreeResponse streamTo = isTransient(r1.endpoint) ? r2: r1; - task = new AsymmetricRemoteSyncTask(desc, streamTo, streamFrom, previewKind); - session.waitForSync(Pair.create(desc, new NodePair(streamTo.endpoint, streamFrom.endpoint)), (AsymmetricRemoteSyncTask) task); - } - else - { - task = new SymmetricRemoteSyncTask(desc, r1, r2, session.previewKind); - // SymmetricRemoteSyncTask expects SyncComplete message sent back. - // Register task to RepairSession to receive response. - session.waitForSync(Pair.create(desc, new NodePair(r1.endpoint, r2.endpoint)), (SymmetricRemoteSyncTask) task); - } - syncTasks.add(task); - taskExecutor.submit(task); + AbstractSyncTask task; + if (r1.endpoint.equals(local) || r2.endpoint.equals(local)) + { + TreeResponse streamFrom = r1.endpoint.equals(local) ? r1 : r2; + TreeResponse streamTo = r2.endpoint.equals(local) ? r2 : r1; + assert !isTransient(streamFrom.endpoint) : "Coordination from a transient replica is not supported"; + + task = new SymmetricLocalSyncTask(desc, streamFrom, streamTo, isTransient(streamTo.endpoint), isIncremental ? desc.parentSessionId : null, session.pullRepair, session.previewKind); + } + else if (isTransient(r1.endpoint) || isTransient(r2.endpoint)) + { + TreeResponse streamFrom = isTransient(r1.endpoint) ? r1 : r2; + TreeResponse streamTo = isTransient(r1.endpoint) ? r2 : r1; + task = new AsymmetricRemoteSyncTask(desc, streamTo, streamFrom, previewKind); + session.waitForSync(Pair.create(desc, task.nodePair()), (AsymmetricRemoteSyncTask) task); + } + else + { + task = new SymmetricRemoteSyncTask(desc, r1, r2, session.previewKind); + // SymmetricRemoteSyncTask expects SyncComplete message sent back. + // Register task to RepairSession to receive response. + session.waitForSync(Pair.create(desc, task.nodePair()), (SymmetricRemoteSyncTask) task); } + syncTasks.add(task); + taskExecutor.submit(task); } - return Futures.allAsList(syncTasks); - }; + } + return Futures.allAsList(syncTasks); } - private AsyncFunction, List> optimisedSyncing() + private ListenableFuture> optimisedSyncing(List trees) { - return trees -> - { - InetAddressAndPort local = FBUtilities.getLocalAddressAndPort(); + InetAddressAndPort local = FBUtilities.getLocalAddressAndPort(); - List syncTasks = new ArrayList<>(); - // We need to difference all trees one against another - DifferenceHolder diffHolder = new DifferenceHolder(trees); + List syncTasks = new ArrayList<>(); + // We need to difference all trees one against another + DifferenceHolder diffHolder = new DifferenceHolder(trees); - logger.debug("diffs = {}", diffHolder); - PreferedNodeFilter preferSameDCFilter = (streaming, candidates) -> - candidates.stream() - .filter(node -> getDC(streaming) - .equals(getDC(node))) - .collect(Collectors.toSet()); - ImmutableMap reducedDifferences = ReduceHelper.reduce(diffHolder, preferSameDCFilter); + logger.debug("diffs = {}", diffHolder); + PreferedNodeFilter preferSameDCFilter = (streaming, candidates) -> + candidates.stream() + .filter(node -> getDC(streaming) + .equals(getDC(node))) + .collect(Collectors.toSet()); + ImmutableMap reducedDifferences = ReduceHelper.reduce(diffHolder, preferSameDCFilter); - for (int i = 0; i < trees.size(); i++) - { - InetAddressAndPort address = trees.get(i).endpoint; + for (int i = 0; i < trees.size(); i++) + { + InetAddressAndPort address = trees.get(i).endpoint; - // we don't stream to transient replicas - if (isTransient(address)) - continue; + // we don't stream to transient replicas + if (isTransient(address)) + continue; - HostDifferences streamsFor = reducedDifferences.get(address); - if (streamsFor != null) + HostDifferences streamsFor = reducedDifferences.get(address); + if (streamsFor != null) + { + assert streamsFor.get(address).isEmpty() : "We should not fetch ranges from ourselves"; + for (InetAddressAndPort fetchFrom : streamsFor.hosts()) { - assert streamsFor.get(address).isEmpty() : "We should not fetch ranges from ourselves"; - for (InetAddressAndPort fetchFrom : streamsFor.hosts()) + List> toFetch = streamsFor.get(fetchFrom); + logger.debug("{} is about to fetch {} from {}", address, toFetch, fetchFrom); + AsymmetricSyncTask task; + if (address.equals(local)) { - List> toFetch = streamsFor.get(fetchFrom); - logger.debug("{} is about to fetch {} from {}", address, toFetch, fetchFrom); - AsymmetricSyncTask task; - if (address.equals(local)) - { - task = new AsymmetricLocalSyncTask(desc, fetchFrom, toFetch, isIncremental ? desc.parentSessionId : null, previewKind); - } - else - { - task = new AsymmetricRemoteSyncTask(desc, address, fetchFrom, toFetch, previewKind); - session.waitForSync(Pair.create(desc, new NodePair(address, fetchFrom)),(AsymmetricRemoteSyncTask)task); - } - syncTasks.add(task); - taskExecutor.submit(task); + task = new AsymmetricLocalSyncTask(desc, fetchFrom, toFetch, isIncremental ? desc.parentSessionId : null, previewKind); } - } - else - { - logger.debug("Node {} has nothing to stream", address); + else + { + task = new AsymmetricRemoteSyncTask(desc, address, fetchFrom, toFetch, previewKind); + session.waitForSync(Pair.create(desc, new NodePair(address, fetchFrom)), (AsymmetricRemoteSyncTask) task); + } + syncTasks.add(task); + taskExecutor.submit(task); } } - return Futures.allAsList(syncTasks); - }; + else + { + logger.debug("Node {} has nothing to stream", address); + } + } + return Futures.allAsList(syncTasks); } private String getDC(InetAddressAndPort address) diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java index 8d3cd54aa37c..34e5ca89bb42 100644 --- a/src/java/org/apache/cassandra/repair/RepairRunnable.java +++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java @@ -196,16 +196,16 @@ protected void runMayThrow() throws Exception Set allNeighbors = new HashSet<>(); List commonRanges = new ArrayList<>(); - //pre-calculate output of getLocalReplicas and pass it to getNeighbors to increase performance and prevent - //calculation multiple times - // we arbitrarily limit ourselves to only full replicas, in lieu of ensuring it is safe to coordinate from a transient replica - Iterable> keyspaceLocalRanges = storageService - .getLocalReplicas(keyspace) - .filter(Replica::isFull) - .ranges(); - try { + //pre-calculate output of getLocalReplicas and pass it to getNeighbors to increase performance and prevent + //calculation multiple times + // we arbitrarily limit ourselves to only full replicas, in lieu of ensuring it is safe to coordinate from a transient replica + Iterable> keyspaceLocalRanges = storageService + .getLocalReplicas(keyspace) + .filter(Replica::isFull) + .ranges(); + for (Range range : options.getRanges()) { EndpointsForRange neighbors = ActiveRepairService.getNeighbors(keyspace, keyspaceLocalRanges, range, @@ -647,15 +647,15 @@ private void recordFailure(String failureMessage, String completionMessage) ImmutableList.of(failureMessage, completionMessage)); } - private void addRangeToNeighbors(List neighborRangeList, Range range, EndpointsForRange neighbors) + private static void addRangeToNeighbors(List neighborRangeList, Range range, EndpointsForRange neighbors) { Set endpoints = neighbors.endpoints(); Set transEndpoints = neighbors.filter(Replica::isTransient).endpoints(); - for (int i = 0; i < neighborRangeList.size(); i++) - { - CommonRange cr = neighborRangeList.get(i); - if (cr.endpoints.containsAll(endpoints) && cr.transEndpoints.containsAll(transEndpoints)) + for (CommonRange cr : neighborRangeList) + { + // Use strict equality here, as worst thing that can happen is we generate one more stream + if (Iterables.elementsEqual(cr.endpoints, endpoints) && Iterables.elementsEqual(cr.transEndpoints, transEndpoints)) { cr.ranges.add(range); return; diff --git a/src/java/org/apache/cassandra/repair/SymmetricSyncTask.java b/src/java/org/apache/cassandra/repair/SymmetricSyncTask.java index 3da2293d00f0..423f98f419fb 100644 --- a/src/java/org/apache/cassandra/repair/SymmetricSyncTask.java +++ b/src/java/org/apache/cassandra/repair/SymmetricSyncTask.java @@ -42,7 +42,7 @@ public abstract class SymmetricSyncTask extends AbstractSyncTask protected final TreeResponse r1; protected final TreeResponse r2; protected final PreviewKind previewKind; - + protected final NodePair nodePair; protected volatile SyncStat stat; protected long startTime = Long.MIN_VALUE; @@ -52,6 +52,7 @@ public SymmetricSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, P this.r1 = r1; this.r2 = r2; this.previewKind = previewKind; + this.nodePair = new NodePair(r1.endpoint, r2.endpoint); } /** @@ -63,7 +64,7 @@ public void run() // compare trees, and collect differences List> differences = MerkleTrees.difference(r1.trees, r2.trees); - stat = new SyncStat(new NodePair(r1.endpoint, r2.endpoint), differences.size()); + stat = new SyncStat(nodePair, differences.size()); // choose a repair method based on the significance of the difference String format = String.format("%s Endpoints %s and %s %%s for %s", previewKind.logPrefix(desc.sessionId), r1.endpoint, r2.endpoint, desc.columnFamily); @@ -81,6 +82,11 @@ public void run() startSync(differences); } + public NodePair nodePair() + { + return nodePair; + } + public SyncStat getCurrentStat() { return stat; diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index 9f370951999f..f7472aeaf8b5 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -20,10 +20,25 @@ import java.io.IOException; import java.lang.management.ManagementFactory; import java.net.UnknownHostException; -import java.util.*; -import java.util.concurrent.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; - import javax.management.MBeanServer; import javax.management.ObjectName; @@ -33,13 +48,9 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Multimap; -import com.google.common.collect.Sets; import com.google.common.util.concurrent.AbstractFuture; - import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; -import org.apache.cassandra.locator.EndpointsByRange; -import org.apache.cassandra.locator.EndpointsForRange; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,27 +68,31 @@ import org.apache.cassandra.gms.EndpointState; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.gms.Gossiper; -import org.apache.cassandra.gms.IFailureDetector; import org.apache.cassandra.gms.IEndpointStateChangeSubscriber; import org.apache.cassandra.gms.IFailureDetectionEventListener; +import org.apache.cassandra.gms.IFailureDetector; import org.apache.cassandra.gms.VersionedValue; +import org.apache.cassandra.locator.EndpointsByRange; +import org.apache.cassandra.locator.EndpointsForRange; import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.locator.Replicas; import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.net.IAsyncCallbackWithFailure; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.repair.CommonRange; -import org.apache.cassandra.repair.RepairRunnable; -import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.repair.RepairJobDesc; import org.apache.cassandra.repair.RepairParallelism; import org.apache.cassandra.repair.RepairSession; import org.apache.cassandra.repair.consistent.CoordinatorSessions; import org.apache.cassandra.repair.consistent.LocalSessions; -import org.apache.cassandra.repair.messages.*; +import org.apache.cassandra.repair.messages.PrepareMessage; +import org.apache.cassandra.repair.messages.RepairMessage; +import org.apache.cassandra.repair.messages.RepairOption; +import org.apache.cassandra.repair.messages.SyncComplete; +import org.apache.cassandra.repair.messages.ValidationComplete; import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.utils.CassandraVersion; import org.apache.cassandra.utils.Clock; import org.apache.cassandra.utils.FBUtilities; From e47019f838f69694d943220e3fa2f42839dd8019 Mon Sep 17 00:00:00 2001 From: Alex Petrov Date: Tue, 4 Sep 2018 19:40:08 +0200 Subject: [PATCH 03/13] Avoid filtering already --- .../apache/cassandra/service/reads/AbstractReadExecutor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java index 031326ef28af..5543fcc3136e 100644 --- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java +++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java @@ -113,7 +113,7 @@ public ReadRepair getReadRepair() protected void makeFullDataRequests(ReplicaCollection replicas) { assert all(replicas, Replica::isFull); - makeRequests(command, replicas.filter(Replica::isFull)); + makeRequests(command, replicas); } protected void makeTransientDataRequests(ReplicaCollection replicas) From 3e61a44a6e4bc4ba0dd32b588905b01bb64aff7f Mon Sep 17 00:00:00 2001 From: Alex Petrov Date: Wed, 5 Sep 2018 13:11:51 +0200 Subject: [PATCH 04/13] Allow transient range owner to be repair coordinator --- .../apache/cassandra/repair/RepairJob.java | 18 ++++++++--- .../cassandra/repair/RepairRunnable.java | 6 +--- .../repair/SymmetricLocalSyncTask.java | 31 +++++++++++-------- .../repair/SymmetricLocalSyncTaskTest.java | 31 +++++++++---------- 4 files changed, 48 insertions(+), 38 deletions(-) diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java index 1ba7349feda9..30bc0850393b 100644 --- a/src/java/org/apache/cassandra/repair/RepairJob.java +++ b/src/java/org/apache/cassandra/repair/RepairJob.java @@ -178,20 +178,30 @@ private ListenableFuture> standardSyncing(List tree { TreeResponse r2 = trees.get(j); + // Avoid streming between two tansient replicas if (isTransient(r1.endpoint) && isTransient(r2.endpoint)) continue; AbstractSyncTask task; if (r1.endpoint.equals(local) || r2.endpoint.equals(local)) { - TreeResponse streamFrom = r1.endpoint.equals(local) ? r1 : r2; - TreeResponse streamTo = r2.endpoint.equals(local) ? r2 : r1; - assert !isTransient(streamFrom.endpoint) : "Coordination from a transient replica is not supported"; + TreeResponse self = r1.endpoint.equals(local) ? r1 : r2; + TreeResponse remote = r2.endpoint.equals(local) ? r1 : r2; - task = new SymmetricLocalSyncTask(desc, streamFrom, streamTo, isTransient(streamTo.endpoint), isIncremental ? desc.parentSessionId : null, session.pullRepair, session.previewKind); + // pull only if local is full + boolean requestRanges = !isTransient(self.endpoint); + // push only if remote is full; additionally check for pull repair + boolean transfterRanges = !isTransient(remote.endpoint) && !session.pullRepair; + + // Nothing to do + if (!requestRanges && !transfterRanges) + continue; + + task = new SymmetricLocalSyncTask(desc, self, remote, isIncremental ? desc.parentSessionId : null, requestRanges, transfterRanges, session.previewKind); } else if (isTransient(r1.endpoint) || isTransient(r2.endpoint)) { + // Stream only from transient replica TreeResponse streamFrom = isTransient(r1.endpoint) ? r1 : r2; TreeResponse streamTo = isTransient(r1.endpoint) ? r2 : r1; task = new AsymmetricRemoteSyncTask(desc, streamTo, streamFrom, previewKind); diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java index 34e5ca89bb42..132b44f4c954 100644 --- a/src/java/org/apache/cassandra/repair/RepairRunnable.java +++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java @@ -200,11 +200,7 @@ protected void runMayThrow() throws Exception { //pre-calculate output of getLocalReplicas and pass it to getNeighbors to increase performance and prevent //calculation multiple times - // we arbitrarily limit ourselves to only full replicas, in lieu of ensuring it is safe to coordinate from a transient replica - Iterable> keyspaceLocalRanges = storageService - .getLocalReplicas(keyspace) - .filter(Replica::isFull) - .ranges(); + Iterable> keyspaceLocalRanges = storageService.getLocalReplicas(keyspace).ranges(); for (Range range : options.getRanges()) { diff --git a/src/java/org/apache/cassandra/repair/SymmetricLocalSyncTask.java b/src/java/org/apache/cassandra/repair/SymmetricLocalSyncTask.java index 7eedab761eac..2eeb438444a2 100644 --- a/src/java/org/apache/cassandra/repair/SymmetricLocalSyncTask.java +++ b/src/java/org/apache/cassandra/repair/SymmetricLocalSyncTask.java @@ -49,16 +49,19 @@ public class SymmetricLocalSyncTask extends SymmetricSyncTask implements StreamE private static final Logger logger = LoggerFactory.getLogger(SymmetricLocalSyncTask.class); - private final boolean remoteIsTransient; private final UUID pendingRepair; - private final boolean pullRepair; + private final boolean requestRanges; + private final boolean transferRanges; - public SymmetricLocalSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, boolean remoteIsTransient, UUID pendingRepair, boolean pullRepair, PreviewKind previewKind) + public SymmetricLocalSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, UUID pendingRepair, boolean requestRanges, boolean transferRanges, PreviewKind previewKind) { super(desc, r1, r2, previewKind); - this.remoteIsTransient = remoteIsTransient; + assert requestRanges || transferRanges : "Nothing to do in a sync job"; + assert r1.endpoint.equals(FBUtilities.getBroadcastAddressAndPort()); + this.pendingRepair = pendingRepair; - this.pullRepair = pullRepair; + this.requestRanges = requestRanges; + this.transferRanges = transferRanges; } @VisibleForTesting @@ -66,12 +69,16 @@ StreamPlan createStreamPlan(InetAddressAndPort dst, List> differenc { StreamPlan plan = new StreamPlan(StreamOperation.REPAIR, 1, false, pendingRepair, previewKind) .listeners(this) - .flushBeforeTransfer(pendingRepair == null) - // see comment on RangesAtEndpoint.toDummyList for why we synthesize replicas here - .requestRanges(dst, desc.keyspace, RangesAtEndpoint.toDummyList(differences), - RangesAtEndpoint.toDummyList(Collections.emptyList()), desc.columnFamily); // request ranges from the remote node + .flushBeforeTransfer(pendingRepair == null); + + if (requestRanges) + { + // see comment on RangesAtEndpoint.toDummyList for why we synthesize replicas here + plan.requestRanges(dst, desc.keyspace, RangesAtEndpoint.toDummyList(differences), + RangesAtEndpoint.toDummyList(Collections.emptyList()), desc.columnFamily); // request ranges from the remote node + } - if (!pullRepair && !remoteIsTransient) + if (transferRanges) { // send ranges to the remote node if we are not performing a pull repair // see comment on RangesAtEndpoint.toDummyList for why we synthesize replicas here @@ -88,9 +95,7 @@ StreamPlan createStreamPlan(InetAddressAndPort dst, List> differenc @Override protected void startSync(List> differences) { - InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort(); - // We can take anyone of the node as source or destination, however if one is localhost, we put at source to avoid a forwarding - InetAddressAndPort dst = r2.endpoint.equals(local) ? r1.endpoint : r2.endpoint; + InetAddressAndPort dst = r2.endpoint; String message = String.format("Performing streaming repair of %d ranges with %s", differences.size(), dst); logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message); diff --git a/test/unit/org/apache/cassandra/repair/SymmetricLocalSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/SymmetricLocalSyncTaskTest.java index 92ae172e2f69..f3e1af105f15 100644 --- a/test/unit/org/apache/cassandra/repair/SymmetricLocalSyncTaskTest.java +++ b/test/unit/org/apache/cassandra/repair/SymmetricLocalSyncTaskTest.java @@ -60,6 +60,7 @@ public class SymmetricLocalSyncTaskTest extends AbstractRepairTest { private static final IPartitioner partitioner = Murmur3Partitioner.instance; + private static final InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort(); public static final String KEYSPACE1 = "DifferencerTest"; public static final String CF_STANDARD = "Standard1"; public static ColumnFamilyStore cfs; @@ -82,8 +83,7 @@ public static void defineSchema() @Test public void testNoDifference() throws Throwable { - final InetAddressAndPort ep1 = InetAddressAndPort.getByName("127.0.0.1"); - final InetAddressAndPort ep2 = InetAddressAndPort.getByName("127.0.0.1"); + final InetAddressAndPort ep2 = InetAddressAndPort.getByName("127.0.0.2"); Range range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken()); RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), KEYSPACE1, "Standard1", Arrays.asList(range)); @@ -94,9 +94,9 @@ public void testNoDifference() throws Throwable // difference the trees // note: we reuse the same endpoint which is bogus in theory but fine here - TreeResponse r1 = new TreeResponse(ep1, tree1); + TreeResponse r1 = new TreeResponse(local, tree1); TreeResponse r2 = new TreeResponse(ep2, tree2); - SymmetricLocalSyncTask task = new SymmetricLocalSyncTask(desc, r1, r2, false, NO_PENDING_REPAIR, false, PreviewKind.NONE); + SymmetricLocalSyncTask task = new SymmetricLocalSyncTask(desc, r1, r2, NO_PENDING_REPAIR, true, true, PreviewKind.NONE); task.run(); assertEquals(0, task.get().numberOfDifferences); @@ -118,7 +118,6 @@ public void testDifference() throws Throwable RepairJobDesc desc = new RepairJobDesc(parentRepairSession, UUID.randomUUID(), KEYSPACE1, "Standard1", Arrays.asList(range)); MerkleTrees tree1 = createInitialTree(desc); - MerkleTrees tree2 = createInitialTree(desc); // change a range in one of the trees @@ -132,9 +131,9 @@ public void testDifference() throws Throwable // difference the trees // note: we reuse the same endpoint which is bogus in theory but fine here - TreeResponse r1 = new TreeResponse(InetAddressAndPort.getByName("127.0.0.1"), tree1); + TreeResponse r1 = new TreeResponse(local, tree1); TreeResponse r2 = new TreeResponse(InetAddressAndPort.getByName("127.0.0.2"), tree2); - SymmetricLocalSyncTask task = new SymmetricLocalSyncTask(desc, r1, r2, false, NO_PENDING_REPAIR, false, PreviewKind.NONE); + SymmetricLocalSyncTask task = new SymmetricLocalSyncTask(desc, r1, r2, NO_PENDING_REPAIR, true, true, PreviewKind.NONE); DefaultConnectionFactory.MAX_CONNECT_ATTEMPTS = 1; DefaultConnectionFactory.MAX_WAIT_TIME_NANOS = TimeUnit.SECONDS.toNanos(2); try @@ -158,11 +157,11 @@ public void fullRepairStreamPlan() throws Exception ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID); RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), KEYSPACE1, CF_STANDARD, prs.getRanges()); - TreeResponse r1 = new TreeResponse(PARTICIPANT1, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); + TreeResponse r1 = new TreeResponse(local, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); - SymmetricLocalSyncTask task = new SymmetricLocalSyncTask(desc, r1, r2, false, NO_PENDING_REPAIR, false, PreviewKind.NONE); - StreamPlan plan = task.createStreamPlan(PARTICIPANT1, Lists.newArrayList(RANGE1)); + SymmetricLocalSyncTask task = new SymmetricLocalSyncTask(desc, r1, r2, NO_PENDING_REPAIR, true, true, PreviewKind.NONE); + StreamPlan plan = task.createStreamPlan(local, Lists.newArrayList(RANGE1)); assertEquals(NO_PENDING_REPAIR, plan.getPendingRepair()); assertTrue(plan.getFlushBeforeTransfer()); @@ -183,11 +182,11 @@ public void incrementalRepairStreamPlan() throws Exception ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID); RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), KEYSPACE1, CF_STANDARD, prs.getRanges()); - TreeResponse r1 = new TreeResponse(PARTICIPANT1, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); + TreeResponse r1 = new TreeResponse(local, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); - SymmetricLocalSyncTask task = new SymmetricLocalSyncTask(desc, r1, r2, false, desc.parentSessionId, false, PreviewKind.NONE); - StreamPlan plan = task.createStreamPlan(PARTICIPANT1, Lists.newArrayList(RANGE1)); + SymmetricLocalSyncTask task = new SymmetricLocalSyncTask(desc, r1, r2, desc.parentSessionId, true, true, PreviewKind.NONE); + StreamPlan plan = task.createStreamPlan(local, Lists.newArrayList(RANGE1)); assertEquals(desc.parentSessionId, plan.getPendingRepair()); assertFalse(plan.getFlushBeforeTransfer()); @@ -204,11 +203,11 @@ public void transientStreamPlan() ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID); RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), KEYSPACE1, CF_STANDARD, prs.getRanges()); - TreeResponse r1 = new TreeResponse(PARTICIPANT1, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); + TreeResponse r1 = new TreeResponse(local, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); - SymmetricLocalSyncTask task = new SymmetricLocalSyncTask(desc, r1, r2, true, desc.parentSessionId, false, PreviewKind.NONE); - StreamPlan plan = task.createStreamPlan(PARTICIPANT2, Lists.newArrayList(RANGE1)); + SymmetricLocalSyncTask task = new SymmetricLocalSyncTask(desc, r1, r2, desc.parentSessionId, true, false, PreviewKind.NONE); + StreamPlan plan = task.createStreamPlan(local, Lists.newArrayList(RANGE1)); assertNumInOut(plan, 1, 0); } From 1d19e26fbbb97d76590d8b164feb61d185d74ba2 Mon Sep 17 00:00:00 2001 From: Alex Petrov Date: Wed, 5 Sep 2018 10:16:51 +0200 Subject: [PATCH 05/13] enable dtests --- .circleci/config.yml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 5a84f724fcf8..9a79b49ddf2d 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -58,16 +58,16 @@ with_dtest_jobs_only: &with_dtest_jobs_only - build # Set env_settings, env_vars, and workflows/build_and_run_tests based on environment env_settings: &env_settings - <<: *default_env_settings - #<<: *high_capacity_env_settings +# <<: *default_env_settings + <<: *high_capacity_env_settings env_vars: &env_vars - <<: *resource_constrained_env_vars - #<<: *high_capacity_env_vars +# <<: *resource_constrained_env_vars + <<: *high_capacity_env_vars workflows: version: 2 - build_and_run_tests: *default_jobs + #build_and_run_tests: *default_jobs #build_and_run_tests: *with_dtest_jobs_only - #build_and_run_tests: *with_dtest_jobs + build_and_run_tests: *with_dtest_jobs docker_image: &docker_image kjellman/cassandra-test:0.4.3 version: 2 jobs: From c9d4e02aeb9f527e4584721881de2794085c594f Mon Sep 17 00:00:00 2001 From: Alex Petrov Date: Wed, 5 Sep 2018 15:14:24 +0200 Subject: [PATCH 06/13] Switch to branch that has TR tests --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 9a79b49ddf2d..22ae51cc92aa 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -206,7 +206,7 @@ jobs: name: Clone Cassandra dtest Repository (via git) command: | export LANG=en_US.UTF-8 - git clone --single-branch --branch master --depth 1 git://github.com/apache/cassandra-dtest.git ~/cassandra-dtest + git clone --single-branch --branch 14409 --depth 1 git://github.com/ifesdjeen/cassandra-dtest.git ~/cassandra-dtest - run: name: Configure virtualenv and python Dependencies command: | From 07acea8e98ca762d06cc61affad371049a49943b Mon Sep 17 00:00:00 2001 From: Alex Petrov Date: Wed, 5 Sep 2018 17:14:01 +0200 Subject: [PATCH 07/13] Use Sets#difference instead of per-element equality --- src/java/org/apache/cassandra/repair/RepairRunnable.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java index 132b44f4c954..e49ca4fa0175 100644 --- a/src/java/org/apache/cassandra/repair/RepairRunnable.java +++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java @@ -651,7 +651,7 @@ private static void addRangeToNeighbors(List neighborRangeList, Ran for (CommonRange cr : neighborRangeList) { // Use strict equality here, as worst thing that can happen is we generate one more stream - if (Iterables.elementsEqual(cr.endpoints, endpoints) && Iterables.elementsEqual(cr.transEndpoints, transEndpoints)) + if (Sets.difference(cr.endpoints, endpoints).isEmpty() && Sets.difference(cr.transEndpoints, transEndpoints).isEmpty()) { cr.ranges.add(range); return; From 31f709c2d08b81245f4ecfa931c2617aa455f9e7 Mon Sep 17 00:00:00 2001 From: Alex Petrov Date: Thu, 6 Sep 2018 10:25:09 +0200 Subject: [PATCH 08/13] Attempt to reduce class hierarchy --- .../repair/AsymmetricLocalSyncTask.java | 105 ------------------ ...cLocalSyncTask.java => LocalSyncTask.java} | 34 +++--- .../apache/cassandra/repair/RepairJob.java | 8 +- .../repair/SymmetricRemoteSyncTask.java | 7 +- .../cassandra/repair/SymmetricSyncTask.java | 24 ++-- .../repair/SymmetricLocalSyncTaskTest.java | 10 +- 6 files changed, 45 insertions(+), 143 deletions(-) delete mode 100644 src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java rename src/java/org/apache/cassandra/repair/{SymmetricLocalSyncTask.java => LocalSyncTask.java} (77%) diff --git a/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java deleted file mode 100644 index eaf890ac3939..000000000000 --- a/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.repair; - -import java.util.Collections; -import java.util.List; -import java.util.UUID; - -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.locator.RangesAtEndpoint; -import org.apache.cassandra.streaming.PreviewKind; -import org.apache.cassandra.streaming.ProgressInfo; -import org.apache.cassandra.streaming.StreamEvent; -import org.apache.cassandra.streaming.StreamEventHandler; -import org.apache.cassandra.streaming.StreamOperation; -import org.apache.cassandra.streaming.StreamPlan; -import org.apache.cassandra.streaming.StreamState; -import org.apache.cassandra.tracing.TraceState; -import org.apache.cassandra.tracing.Tracing; -import org.apache.cassandra.utils.FBUtilities; - -public class AsymmetricLocalSyncTask extends AsymmetricSyncTask implements StreamEventHandler -{ - private final UUID pendingRepair; - private final TraceState state = Tracing.instance.get(); - - public AsymmetricLocalSyncTask(RepairJobDesc desc, InetAddressAndPort fetchFrom, List> rangesToFetch, UUID pendingRepair, PreviewKind previewKind) - { - super(desc, FBUtilities.getBroadcastAddressAndPort(), fetchFrom, rangesToFetch, previewKind); - this.pendingRepair = pendingRepair; - } - - public void startSync(List> rangesToFetch) - { - StreamPlan plan = new StreamPlan(StreamOperation.REPAIR, - 1, false, - pendingRepair, - previewKind) - .listeners(this) - .flushBeforeTransfer(pendingRepair == null) - // request ranges from the remote node, see comment on RangesAtEndpoint.toDummyList for why we synthesize replicas here - .requestRanges(fetchFrom, desc.keyspace, RangesAtEndpoint.toDummyList(rangesToFetch), - RangesAtEndpoint.toDummyList(Collections.emptyList()), desc.columnFamily); - plan.execute(); - - } - - public void handleStreamEvent(StreamEvent event) - { - if (state == null) - return; - switch (event.eventType) - { - case STREAM_PREPARED: - StreamEvent.SessionPreparedEvent spe = (StreamEvent.SessionPreparedEvent) event; - state.trace("Streaming session with {} prepared", spe.session.peer); - break; - case STREAM_COMPLETE: - StreamEvent.SessionCompleteEvent sce = (StreamEvent.SessionCompleteEvent) event; - state.trace("Streaming session with {} {}", sce.peer, sce.success ? "completed successfully" : "failed"); - break; - case FILE_PROGRESS: - ProgressInfo pi = ((StreamEvent.ProgressEvent) event).progress; - state.trace("{}/{} ({}%) {} idx:{}{}", - new Object[] { FBUtilities.prettyPrintMemory(pi.currentBytes), - FBUtilities.prettyPrintMemory(pi.totalBytes), - pi.currentBytes * 100 / pi.totalBytes, - pi.direction == ProgressInfo.Direction.OUT ? "sent to" : "received from", - pi.sessionIndex, - pi.peer }); - } - } - - public void onSuccess(StreamState result) - { - String message = String.format("Sync complete using session %s between %s and %s on %s", desc.sessionId, fetchingNode, fetchFrom, desc.columnFamily); - Tracing.traceRepair(message); - set(stat); - finished(); - } - - public void onFailure(Throwable t) - { - setException(t); - finished(); - } -} diff --git a/src/java/org/apache/cassandra/repair/SymmetricLocalSyncTask.java b/src/java/org/apache/cassandra/repair/LocalSyncTask.java similarity index 77% rename from src/java/org/apache/cassandra/repair/SymmetricLocalSyncTask.java rename to src/java/org/apache/cassandra/repair/LocalSyncTask.java index 2eeb438444a2..32c2edb789b0 100644 --- a/src/java/org/apache/cassandra/repair/SymmetricLocalSyncTask.java +++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java @@ -39,25 +39,31 @@ import org.apache.cassandra.tracing.TraceState; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MerkleTrees; /** - * SymmetricLocalSyncTask performs streaming between local(coordinator) node and remote replica. + * LocalSyncTask performs streaming between local(coordinator) node and remote replica. */ -public class SymmetricLocalSyncTask extends SymmetricSyncTask implements StreamEventHandler +public class LocalSyncTask extends SymmetricSyncTask implements StreamEventHandler { private final TraceState state = Tracing.instance.get(); - private static final Logger logger = LoggerFactory.getLogger(SymmetricLocalSyncTask.class); + private static final Logger logger = LoggerFactory.getLogger(LocalSyncTask.class); private final UUID pendingRepair; private final boolean requestRanges; private final boolean transferRanges; - public SymmetricLocalSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, UUID pendingRepair, boolean requestRanges, boolean transferRanges, PreviewKind previewKind) + public LocalSyncTask(RepairJobDesc desc, TreeResponse local, TreeResponse remote, UUID pendingRepair, boolean requestRanges, boolean transferRanges, PreviewKind previewKind) { - super(desc, r1, r2, previewKind); + this(desc, local.endpoint, remote.endpoint, MerkleTrees.difference(local.trees, remote.trees), pendingRepair, requestRanges, transferRanges, previewKind); + } + + public LocalSyncTask(RepairJobDesc desc, InetAddressAndPort local, InetAddressAndPort remote, List> diff, UUID pendingRepair, boolean requestRanges, boolean transferRanges, PreviewKind previewKind) + { + super(desc, local, remote, diff, previewKind); assert requestRanges || transferRanges : "Nothing to do in a sync job"; - assert r1.endpoint.equals(FBUtilities.getBroadcastAddressAndPort()); + assert local.equals(FBUtilities.getBroadcastAddressAndPort()); this.pendingRepair = pendingRepair; this.requestRanges = requestRanges; @@ -65,7 +71,7 @@ public SymmetricLocalSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse } @VisibleForTesting - StreamPlan createStreamPlan(InetAddressAndPort dst, List> differences) + StreamPlan createStreamPlan(InetAddressAndPort remote, List> differences) { StreamPlan plan = new StreamPlan(StreamOperation.REPAIR, 1, false, pendingRepair, previewKind) .listeners(this) @@ -74,15 +80,15 @@ StreamPlan createStreamPlan(InetAddressAndPort dst, List> differenc if (requestRanges) { // see comment on RangesAtEndpoint.toDummyList for why we synthesize replicas here - plan.requestRanges(dst, desc.keyspace, RangesAtEndpoint.toDummyList(differences), - RangesAtEndpoint.toDummyList(Collections.emptyList()), desc.columnFamily); // request ranges from the remote node + plan.requestRanges(remote, desc.keyspace, RangesAtEndpoint.toDummyList(differences), + RangesAtEndpoint.toDummyList(Collections.emptyList()), desc.columnFamily); } if (transferRanges) { // send ranges to the remote node if we are not performing a pull repair // see comment on RangesAtEndpoint.toDummyList for why we synthesize replicas here - plan.transferRanges(dst, desc.keyspace, RangesAtEndpoint.toDummyList(differences), desc.columnFamily); + plan.transferRanges(remote, desc.keyspace, RangesAtEndpoint.toDummyList(differences), desc.columnFamily); } return plan; @@ -95,13 +101,13 @@ StreamPlan createStreamPlan(InetAddressAndPort dst, List> differenc @Override protected void startSync(List> differences) { - InetAddressAndPort dst = r2.endpoint; + InetAddressAndPort remote = endpoint2; - String message = String.format("Performing streaming repair of %d ranges with %s", differences.size(), dst); + String message = String.format("Performing streaming repair of %d ranges with %s", differences.size(), remote); logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message); Tracing.traceRepair(message); - createStreamPlan(dst, differences).execute(); + createStreamPlan(remote, differences).execute(); } public void handleStreamEvent(StreamEvent event) @@ -132,7 +138,7 @@ public void handleStreamEvent(StreamEvent event) public void onSuccess(StreamState result) { - String message = String.format("Sync complete using session %s between %s and %s on %s", desc.sessionId, r1.endpoint, r2.endpoint, desc.columnFamily); + String message = String.format("Sync complete using session %s between %s and %s on %s", desc.sessionId, endpoint1, endpoint2, desc.columnFamily); logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message); Tracing.traceRepair(message); set(stat.withSummaries(result.createSummaries())); diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java index 30bc0850393b..f2c132873776 100644 --- a/src/java/org/apache/cassandra/repair/RepairJob.java +++ b/src/java/org/apache/cassandra/repair/RepairJob.java @@ -197,7 +197,7 @@ private ListenableFuture> standardSyncing(List tree if (!requestRanges && !transfterRanges) continue; - task = new SymmetricLocalSyncTask(desc, self, remote, isIncremental ? desc.parentSessionId : null, requestRanges, transfterRanges, session.previewKind); + task = new LocalSyncTask(desc, self, remote, isIncremental ? desc.parentSessionId : null, requestRanges, transfterRanges, session.previewKind); } else if (isTransient(r1.endpoint) || isTransient(r2.endpoint)) { @@ -253,15 +253,15 @@ private ListenableFuture> optimisedSyncing(List tre { List> toFetch = streamsFor.get(fetchFrom); logger.debug("{} is about to fetch {} from {}", address, toFetch, fetchFrom); - AsymmetricSyncTask task; + AbstractSyncTask task; if (address.equals(local)) { - task = new AsymmetricLocalSyncTask(desc, fetchFrom, toFetch, isIncremental ? desc.parentSessionId : null, previewKind); + task = new LocalSyncTask(desc, address, fetchFrom, toFetch, isIncremental ? desc.parentSessionId : null, true, false, session.previewKind); } else { task = new AsymmetricRemoteSyncTask(desc, address, fetchFrom, toFetch, previewKind); - session.waitForSync(Pair.create(desc, new NodePair(address, fetchFrom)), (AsymmetricRemoteSyncTask) task); + session.waitForSync(Pair.create(desc, task.nodePair()), (AsymmetricRemoteSyncTask) task); } syncTasks.add(task); taskExecutor.submit(task); diff --git a/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java b/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java index 1f2740f8e36f..ad430dae8461 100644 --- a/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java +++ b/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java @@ -36,6 +36,7 @@ import org.apache.cassandra.streaming.SessionSummary; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MerkleTrees; /** * SymmetricRemoteSyncTask sends {@link SyncRequest} to remote(non-coordinator) node @@ -49,7 +50,7 @@ public class SymmetricRemoteSyncTask extends SymmetricSyncTask implements Comple public SymmetricRemoteSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, PreviewKind previewKind) { - super(desc, r1, r2, previewKind); + super(desc, r1.endpoint, r2.endpoint, MerkleTrees.difference(r1.trees, r2.trees), previewKind); } void sendRequest(RepairMessage request, InetAddressAndPort to) @@ -62,7 +63,7 @@ protected void startSync(List> differences) { InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort(); - SyncRequest request = new SyncRequest(desc, local, r1.endpoint, r2.endpoint, differences, previewKind); + SyncRequest request = new SyncRequest(desc, local, endpoint1, endpoint2, differences, previewKind); String message = String.format("Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", request.ranges.size(), request.src, request.dst); logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message); Tracing.traceRepair(message); @@ -77,7 +78,7 @@ public void syncComplete(boolean success, List summaries) } else { - setException(new RepairException(desc, previewKind, String.format("Sync failed between %s and %s", r1.endpoint, r2.endpoint))); + setException(new RepairException(desc, previewKind, String.format("Sync failed between %s and %s", endpoint1, endpoint2))); } finished(); } diff --git a/src/java/org/apache/cassandra/repair/SymmetricSyncTask.java b/src/java/org/apache/cassandra/repair/SymmetricSyncTask.java index 423f98f419fb..c1fa9330c2af 100644 --- a/src/java/org/apache/cassandra/repair/SymmetricSyncTask.java +++ b/src/java/org/apache/cassandra/repair/SymmetricSyncTask.java @@ -26,9 +26,9 @@ import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.tracing.Tracing; -import org.apache.cassandra.utils.MerkleTrees; /** * SymmetricSyncTask will calculate the difference of MerkleTree between two nodes @@ -39,20 +39,22 @@ public abstract class SymmetricSyncTask extends AbstractSyncTask private static Logger logger = LoggerFactory.getLogger(SymmetricSyncTask.class); protected final RepairJobDesc desc; - protected final TreeResponse r1; - protected final TreeResponse r2; + protected final InetAddressAndPort endpoint1; + protected final InetAddressAndPort endpoint2; + protected final List> differences; protected final PreviewKind previewKind; protected final NodePair nodePair; protected volatile SyncStat stat; protected long startTime = Long.MIN_VALUE; - public SymmetricSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, PreviewKind previewKind) + public SymmetricSyncTask(RepairJobDesc desc, InetAddressAndPort endpoint1, InetAddressAndPort endpoint2, List> differences, PreviewKind previewKind) { this.desc = desc; - this.r1 = r1; - this.r2 = r2; + this.endpoint1 = endpoint1; + this.endpoint2 = endpoint2; + this.differences = differences; this.previewKind = previewKind; - this.nodePair = new NodePair(r1.endpoint, r2.endpoint); + this.nodePair = new NodePair(endpoint1, endpoint2); } /** @@ -61,24 +63,22 @@ public SymmetricSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, P public void run() { startTime = System.currentTimeMillis(); - // compare trees, and collect differences - List> differences = MerkleTrees.difference(r1.trees, r2.trees); stat = new SyncStat(nodePair, differences.size()); // choose a repair method based on the significance of the difference - String format = String.format("%s Endpoints %s and %s %%s for %s", previewKind.logPrefix(desc.sessionId), r1.endpoint, r2.endpoint, desc.columnFamily); + String format = String.format("%s Endpoints %s and %s %%s for %s", previewKind.logPrefix(desc.sessionId), endpoint1, endpoint2, desc.columnFamily); if (differences.isEmpty()) { logger.info(String.format(format, "are consistent")); - Tracing.traceRepair("Endpoint {} is consistent with {} for {}", r1.endpoint, r2.endpoint, desc.columnFamily); + Tracing.traceRepair("Endpoint {} is consistent with {} for {}", endpoint1, endpoint2, desc.columnFamily); set(stat); return; } // non-0 difference: perform streaming repair logger.info(String.format(format, "have " + differences.size() + " range(s) out of sync")); - Tracing.traceRepair("Endpoint {} has {} range(s) out of sync with {} for {}", r1.endpoint, differences.size(), r2.endpoint, desc.columnFamily); + Tracing.traceRepair("Endpoint {} has {} range(s) out of sync with {} for {}", endpoint1, differences.size(), endpoint2, desc.columnFamily); startSync(differences); } diff --git a/test/unit/org/apache/cassandra/repair/SymmetricLocalSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/SymmetricLocalSyncTaskTest.java index f3e1af105f15..3c5c0bcb6bc8 100644 --- a/test/unit/org/apache/cassandra/repair/SymmetricLocalSyncTaskTest.java +++ b/test/unit/org/apache/cassandra/repair/SymmetricLocalSyncTaskTest.java @@ -96,7 +96,7 @@ public void testNoDifference() throws Throwable // note: we reuse the same endpoint which is bogus in theory but fine here TreeResponse r1 = new TreeResponse(local, tree1); TreeResponse r2 = new TreeResponse(ep2, tree2); - SymmetricLocalSyncTask task = new SymmetricLocalSyncTask(desc, r1, r2, NO_PENDING_REPAIR, true, true, PreviewKind.NONE); + LocalSyncTask task = new LocalSyncTask(desc, r1, r2, NO_PENDING_REPAIR, true, true, PreviewKind.NONE); task.run(); assertEquals(0, task.get().numberOfDifferences); @@ -133,7 +133,7 @@ public void testDifference() throws Throwable // note: we reuse the same endpoint which is bogus in theory but fine here TreeResponse r1 = new TreeResponse(local, tree1); TreeResponse r2 = new TreeResponse(InetAddressAndPort.getByName("127.0.0.2"), tree2); - SymmetricLocalSyncTask task = new SymmetricLocalSyncTask(desc, r1, r2, NO_PENDING_REPAIR, true, true, PreviewKind.NONE); + LocalSyncTask task = new LocalSyncTask(desc, r1, r2, NO_PENDING_REPAIR, true, true, PreviewKind.NONE); DefaultConnectionFactory.MAX_CONNECT_ATTEMPTS = 1; DefaultConnectionFactory.MAX_WAIT_TIME_NANOS = TimeUnit.SECONDS.toNanos(2); try @@ -160,7 +160,7 @@ public void fullRepairStreamPlan() throws Exception TreeResponse r1 = new TreeResponse(local, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); - SymmetricLocalSyncTask task = new SymmetricLocalSyncTask(desc, r1, r2, NO_PENDING_REPAIR, true, true, PreviewKind.NONE); + LocalSyncTask task = new LocalSyncTask(desc, r1, r2, NO_PENDING_REPAIR, true, true, PreviewKind.NONE); StreamPlan plan = task.createStreamPlan(local, Lists.newArrayList(RANGE1)); assertEquals(NO_PENDING_REPAIR, plan.getPendingRepair()); @@ -185,7 +185,7 @@ public void incrementalRepairStreamPlan() throws Exception TreeResponse r1 = new TreeResponse(local, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); - SymmetricLocalSyncTask task = new SymmetricLocalSyncTask(desc, r1, r2, desc.parentSessionId, true, true, PreviewKind.NONE); + LocalSyncTask task = new LocalSyncTask(desc, r1, r2, desc.parentSessionId, true, true, PreviewKind.NONE); StreamPlan plan = task.createStreamPlan(local, Lists.newArrayList(RANGE1)); assertEquals(desc.parentSessionId, plan.getPendingRepair()); @@ -206,7 +206,7 @@ public void transientStreamPlan() TreeResponse r1 = new TreeResponse(local, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); - SymmetricLocalSyncTask task = new SymmetricLocalSyncTask(desc, r1, r2, desc.parentSessionId, true, false, PreviewKind.NONE); + LocalSyncTask task = new LocalSyncTask(desc, r1, r2, desc.parentSessionId, true, false, PreviewKind.NONE); StreamPlan plan = task.createStreamPlan(local, Lists.newArrayList(RANGE1)); assertNumInOut(plan, 1, 0); } From 7cc36f2b0820beca9009968e12b5cd2e02088e01 Mon Sep 17 00:00:00 2001 From: Alex Petrov Date: Thu, 6 Sep 2018 15:42:28 +0200 Subject: [PATCH 09/13] Fix SymmetricRemoteSyncTaskTest --- .../apache/cassandra/repair/SymmetricRemoteSyncTask.java | 8 ++++++++ .../cassandra/repair/SymmetricRemoteSyncTaskTest.java | 4 ++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java b/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java index ad430dae8461..b76622fb3f31 100644 --- a/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java +++ b/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.function.Predicate; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,6 +54,13 @@ public SymmetricRemoteSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse super(desc, r1.endpoint, r2.endpoint, MerkleTrees.difference(r1.trees, r2.trees), previewKind); } + @VisibleForTesting + SymmetricRemoteSyncTask(RepairJobDesc desc, InetAddressAndPort e1, InetAddressAndPort e2, + List> differences, PreviewKind previewKind) + { + super(desc, e1, e2, differences, previewKind); + } + void sendRequest(RepairMessage request, InetAddressAndPort to) { MessagingService.instance().sendOneWay(request.createMessage(), to); diff --git a/test/unit/org/apache/cassandra/repair/SymmetricRemoteSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/SymmetricRemoteSyncTaskTest.java index 06f968f6fc3a..3a8694650a7b 100644 --- a/test/unit/org/apache/cassandra/repair/SymmetricRemoteSyncTaskTest.java +++ b/test/unit/org/apache/cassandra/repair/SymmetricRemoteSyncTaskTest.java @@ -30,18 +30,18 @@ import org.apache.cassandra.repair.messages.RepairMessage; import org.apache.cassandra.repair.messages.SyncRequest; import org.apache.cassandra.streaming.PreviewKind; +import org.apache.cassandra.utils.MerkleTree; import org.apache.cassandra.utils.UUIDGen; public class SymmetricRemoteSyncTaskTest extends AbstractRepairTest { private static final RepairJobDesc DESC = new RepairJobDesc(UUIDGen.getTimeUUID(), UUIDGen.getTimeUUID(), "ks", "tbl", ALL_RANGES); private static final List> RANGE_LIST = ImmutableList.of(RANGE1); - private static class InstrumentedSymmetricRemoteSyncTask extends SymmetricRemoteSyncTask { public InstrumentedSymmetricRemoteSyncTask(InetAddressAndPort e1, InetAddressAndPort e2) { - super(DESC, new TreeResponse(e1, null), new TreeResponse(e2, null), PreviewKind.NONE); + super(DESC, e1, e2, RANGE_LIST, PreviewKind.NONE); } RepairMessage sentMessage = null; From 592cf72c7ef152d09472fe4b7215552b10571e89 Mon Sep 17 00:00:00 2001 From: Blake Eggleston Date: Wed, 5 Sep 2018 10:20:52 -0700 Subject: [PATCH 10/13] misc updates --- .../cassandra/repair/AsymmetricSyncTask.java | 6 ++---- .../apache/cassandra/repair/CommonRange.java | 8 +++++++- .../cassandra/repair/LocalSyncTask.java | 5 +++-- .../apache/cassandra/repair/RepairJob.java | 6 ++++-- .../cassandra/repair/RepairRunnable.java | 11 +++++----- .../cassandra/repair/SymmetricSyncTask.java | 2 ++ .../repair/SymmetricLocalSyncTaskTest.java | 20 ++++++++++++++++++- 7 files changed, 42 insertions(+), 16 deletions(-) diff --git a/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java index c1800561358a..afd82129f33d 100644 --- a/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java +++ b/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; +import com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +47,7 @@ public abstract class AsymmetricSyncTask extends AbstractSyncTask public AsymmetricSyncTask(RepairJobDesc desc, InetAddressAndPort fetchingNode, InetAddressAndPort fetchFrom, List> rangesToFetch, PreviewKind previewKind) { - assert !fetchFrom.equals(fetchingNode) : "Fetching from self " + fetchFrom; + Preconditions.checkArgument(!fetchFrom.equals(fetchingNode), "Sending and receiving node are the same: %s", fetchFrom); this.desc = desc; this.fetchFrom = fetchFrom; this.fetchingNode = fetchingNode; @@ -55,7 +56,6 @@ public AsymmetricSyncTask(RepairJobDesc desc, InetAddressAndPort fetchingNode, I // todo: make an AsymmetricSyncStat? stat = new SyncStat(nodePair, rangesToFetch.size()); this.previewKind = previewKind; - } public NodePair nodePair() @@ -87,6 +87,4 @@ protected void finished() if (startTime != Long.MIN_VALUE) Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).metric.syncTime.update(System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS); } - - } diff --git a/src/java/org/apache/cassandra/repair/CommonRange.java b/src/java/org/apache/cassandra/repair/CommonRange.java index 928e570c428b..6b55dc758b76 100644 --- a/src/java/org/apache/cassandra/repair/CommonRange.java +++ b/src/java/org/apache/cassandra/repair/CommonRange.java @@ -48,7 +48,13 @@ public CommonRange(Set endpoints, Set tr this.endpoints = ImmutableSet.copyOf(endpoints); this.transEndpoints = ImmutableSet.copyOf(transEndpoints); - this.ranges = new ArrayList(ranges); + this.ranges = new ArrayList<>(ranges); + } + + public boolean matchesEndpoints(Set endpoints, Set transEndpoints) + { + // Use strict equality here, as worst thing that can happen is we generate one more stream + return this.endpoints.equals(endpoints) && this.transEndpoints.equals(transEndpoints); } public boolean equals(Object o) diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java b/src/java/org/apache/cassandra/repair/LocalSyncTask.java index 32c2edb789b0..17a6d7a037da 100644 --- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java +++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java @@ -22,6 +22,7 @@ import java.util.UUID; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,8 +63,8 @@ public LocalSyncTask(RepairJobDesc desc, TreeResponse local, TreeResponse remote public LocalSyncTask(RepairJobDesc desc, InetAddressAndPort local, InetAddressAndPort remote, List> diff, UUID pendingRepair, boolean requestRanges, boolean transferRanges, PreviewKind previewKind) { super(desc, local, remote, diff, previewKind); - assert requestRanges || transferRanges : "Nothing to do in a sync job"; - assert local.equals(FBUtilities.getBroadcastAddressAndPort()); + Preconditions.checkArgument(requestRanges || transferRanges, "Nothing to do in a sync job"); + Preconditions.checkArgument(local.equals(FBUtilities.getBroadcastAddressAndPort())); this.pendingRepair = pendingRepair; this.requestRanges = requestRanges; diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java index f2c132873776..1ebdaa262111 100644 --- a/src/java/org/apache/cassandra/repair/RepairJob.java +++ b/src/java/org/apache/cassandra/repair/RepairJob.java @@ -20,6 +20,7 @@ import java.util.*; import java.util.stream.Collectors; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.*; import org.slf4j.Logger; @@ -197,7 +198,8 @@ private ListenableFuture> standardSyncing(List tree if (!requestRanges && !transfterRanges) continue; - task = new LocalSyncTask(desc, self, remote, isIncremental ? desc.parentSessionId : null, requestRanges, transfterRanges, session.previewKind); + task = new LocalSyncTask(desc, self, remote, isIncremental ? desc.parentSessionId : null, + requestRanges, transfterRanges, session.previewKind); } else if (isTransient(r1.endpoint) || isTransient(r2.endpoint)) { @@ -248,7 +250,7 @@ private ListenableFuture> optimisedSyncing(List tre HostDifferences streamsFor = reducedDifferences.get(address); if (streamsFor != null) { - assert streamsFor.get(address).isEmpty() : "We should not fetch ranges from ourselves"; + Preconditions.checkArgument(streamsFor.get(address).isEmpty(), "We should not fetch ranges from ourselves"); for (InetAddressAndPort fetchFrom : streamsFor.hosts()) { List> toFetch = streamsFor.get(fetchFrom); diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java index e49ca4fa0175..fa0c2a93ff55 100644 --- a/src/java/org/apache/cassandra/repair/RepairRunnable.java +++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java @@ -205,8 +205,8 @@ protected void runMayThrow() throws Exception for (Range range : options.getRanges()) { EndpointsForRange neighbors = ActiveRepairService.getNeighbors(keyspace, keyspaceLocalRanges, range, - options.getDataCenters(), - options.getHosts()); + options.getDataCenters(), + options.getHosts()); addRangeToNeighbors(commonRanges, range, neighbors); allNeighbors.addAll(neighbors.endpoints()); @@ -648,12 +648,11 @@ private static void addRangeToNeighbors(List neighborRangeList, Ran Set endpoints = neighbors.endpoints(); Set transEndpoints = neighbors.filter(Replica::isTransient).endpoints(); - for (CommonRange cr : neighborRangeList) + for (CommonRange commonRange : neighborRangeList) { - // Use strict equality here, as worst thing that can happen is we generate one more stream - if (Sets.difference(cr.endpoints, endpoints).isEmpty() && Sets.difference(cr.transEndpoints, transEndpoints).isEmpty()) + if (commonRange.matchesEndpoints(endpoints, transEndpoints)) { - cr.ranges.add(range); + commonRange.ranges.add(range); return; } } diff --git a/src/java/org/apache/cassandra/repair/SymmetricSyncTask.java b/src/java/org/apache/cassandra/repair/SymmetricSyncTask.java index c1fa9330c2af..4313214e6f06 100644 --- a/src/java/org/apache/cassandra/repair/SymmetricSyncTask.java +++ b/src/java/org/apache/cassandra/repair/SymmetricSyncTask.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; +import com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,6 +50,7 @@ public abstract class SymmetricSyncTask extends AbstractSyncTask public SymmetricSyncTask(RepairJobDesc desc, InetAddressAndPort endpoint1, InetAddressAndPort endpoint2, List> differences, PreviewKind previewKind) { + Preconditions.checkArgument(!endpoint1.equals(endpoint2), "Both sync targets are the same: %s", endpoint1); this.desc = desc; this.endpoint1 = endpoint1; this.endpoint2 = endpoint2; diff --git a/test/unit/org/apache/cassandra/repair/SymmetricLocalSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/SymmetricLocalSyncTaskTest.java index 3c5c0bcb6bc8..1732ecdca263 100644 --- a/test/unit/org/apache/cassandra/repair/SymmetricLocalSyncTaskTest.java +++ b/test/unit/org/apache/cassandra/repair/SymmetricLocalSyncTaskTest.java @@ -197,7 +197,7 @@ public void incrementalRepairStreamPlan() throws Exception * Don't reciprocate streams if the other endpoint is a transient replica */ @Test - public void transientStreamPlan() + public void transientRemoteStreamPlan() { UUID sessionID = registerSession(cfs, true, true); ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID); @@ -211,6 +211,24 @@ public void transientStreamPlan() assertNumInOut(plan, 1, 0); } + /** + * Don't request streams if the other endpoint is a transient replica + */ + @Test + public void transientLocalStreamPlan() + { + UUID sessionID = registerSession(cfs, true, true); + ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID); + RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), KEYSPACE1, CF_STANDARD, prs.getRanges()); + + TreeResponse r1 = new TreeResponse(local, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); + TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); + + SymmetricLocalSyncTask task = new SymmetricLocalSyncTask(desc, r1, r2, desc.parentSessionId, false, true, PreviewKind.NONE); + StreamPlan plan = task.createStreamPlan(local, Lists.newArrayList(RANGE1)); + assertNumInOut(plan, 0, 1); + } + private MerkleTrees createInitialTree(RepairJobDesc desc, IPartitioner partitioner) { MerkleTrees tree = new MerkleTrees(partitioner); From 389223a5f7e9c470a0faac51b25e91ce43c63007 Mon Sep 17 00:00:00 2001 From: Alex Petrov Date: Thu, 6 Sep 2018 15:49:51 +0200 Subject: [PATCH 11/13] Rename test, too --- ...SymmetricLocalSyncTaskTest.java => LocalSyncTaskTest.java} | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) rename test/unit/org/apache/cassandra/repair/{SymmetricLocalSyncTaskTest.java => LocalSyncTaskTest.java} (98%) diff --git a/test/unit/org/apache/cassandra/repair/SymmetricLocalSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java similarity index 98% rename from test/unit/org/apache/cassandra/repair/SymmetricLocalSyncTaskTest.java rename to test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java index 1732ecdca263..4a11c90d2627 100644 --- a/test/unit/org/apache/cassandra/repair/SymmetricLocalSyncTaskTest.java +++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java @@ -57,7 +57,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -public class SymmetricLocalSyncTaskTest extends AbstractRepairTest +public class LocalSyncTaskTest extends AbstractRepairTest { private static final IPartitioner partitioner = Murmur3Partitioner.instance; private static final InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort(); @@ -224,7 +224,7 @@ public void transientLocalStreamPlan() TreeResponse r1 = new TreeResponse(local, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); - SymmetricLocalSyncTask task = new SymmetricLocalSyncTask(desc, r1, r2, desc.parentSessionId, false, true, PreviewKind.NONE); + LocalSyncTask task = new LocalSyncTask(desc, r1, r2, desc.parentSessionId, false, true, PreviewKind.NONE); StreamPlan plan = task.createStreamPlan(local, Lists.newArrayList(RANGE1)); assertNumInOut(plan, 0, 1); } From f4e1f3fc55f7b1466780b0cfdf473cc7c1b61a13 Mon Sep 17 00:00:00 2001 From: Alex Petrov Date: Thu, 6 Sep 2018 17:14:02 +0200 Subject: [PATCH 12/13] Cosmetics --- .../org/apache/cassandra/repair/LocalSyncTask.java | 10 +++++++--- src/java/org/apache/cassandra/repair/RepairJob.java | 3 ++- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java b/src/java/org/apache/cassandra/repair/LocalSyncTask.java index 17a6d7a037da..bb1595474007 100644 --- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java +++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java @@ -55,12 +55,16 @@ public class LocalSyncTask extends SymmetricSyncTask implements StreamEventHandl private final boolean requestRanges; private final boolean transferRanges; - public LocalSyncTask(RepairJobDesc desc, TreeResponse local, TreeResponse remote, UUID pendingRepair, boolean requestRanges, boolean transferRanges, PreviewKind previewKind) + public LocalSyncTask(RepairJobDesc desc, TreeResponse local, TreeResponse remote, UUID pendingRepair, + boolean requestRanges, boolean transferRanges, PreviewKind previewKind) { - this(desc, local.endpoint, remote.endpoint, MerkleTrees.difference(local.trees, remote.trees), pendingRepair, requestRanges, transferRanges, previewKind); + this(desc, local.endpoint, remote.endpoint, MerkleTrees.difference(local.trees, remote.trees), + pendingRepair, requestRanges, transferRanges, previewKind); } - public LocalSyncTask(RepairJobDesc desc, InetAddressAndPort local, InetAddressAndPort remote, List> diff, UUID pendingRepair, boolean requestRanges, boolean transferRanges, PreviewKind previewKind) + public LocalSyncTask(RepairJobDesc desc, InetAddressAndPort local, InetAddressAndPort remote, + List> diff, UUID pendingRepair, + boolean requestRanges, boolean transferRanges, PreviewKind previewKind) { super(desc, local, remote, diff, previewKind); Preconditions.checkArgument(requestRanges || transferRanges, "Nothing to do in a sync job"); diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java index 1ebdaa262111..141fcec4bc62 100644 --- a/src/java/org/apache/cassandra/repair/RepairJob.java +++ b/src/java/org/apache/cassandra/repair/RepairJob.java @@ -258,7 +258,8 @@ private ListenableFuture> optimisedSyncing(List tre AbstractSyncTask task; if (address.equals(local)) { - task = new LocalSyncTask(desc, address, fetchFrom, toFetch, isIncremental ? desc.parentSessionId : null, true, false, session.previewKind); + task = new LocalSyncTask(desc, address, fetchFrom, toFetch, isIncremental ? desc.parentSessionId : null, + true, false, session.previewKind); } else { From 090b2245d49142ca990b7bc82627d55e8b4848f9 Mon Sep 17 00:00:00 2001 From: Alex Petrov Date: Thu, 6 Sep 2018 23:12:47 +0200 Subject: [PATCH 13/13] =?UTF-8?q?Fix=20Ariel=E2=80=99s=20comments?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../apache/cassandra/repair/RepairJob.java | 6 ++-- .../service/ActiveRepairService.java | 36 +++++-------------- 2 files changed, 12 insertions(+), 30 deletions(-) diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java index 141fcec4bc62..eced965c3a7b 100644 --- a/src/java/org/apache/cassandra/repair/RepairJob.java +++ b/src/java/org/apache/cassandra/repair/RepairJob.java @@ -192,14 +192,14 @@ private ListenableFuture> standardSyncing(List tree // pull only if local is full boolean requestRanges = !isTransient(self.endpoint); // push only if remote is full; additionally check for pull repair - boolean transfterRanges = !isTransient(remote.endpoint) && !session.pullRepair; + boolean transferRanges = !isTransient(remote.endpoint) && !session.pullRepair; // Nothing to do - if (!requestRanges && !transfterRanges) + if (!requestRanges && !transferRanges) continue; task = new LocalSyncTask(desc, self, remote, isIncremental ? desc.parentSessionId : null, - requestRanges, transfterRanges, session.previewKind); + requestRanges, transferRanges, session.previewKind); } else if (isTransient(r1.endpoint) || isTransient(r2.endpoint)) { diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index f7472aeaf8b5..8ffca6a7bdce 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -20,25 +20,10 @@ import java.io.IOException; import java.lang.management.ManagementFactory; import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.*; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; + import javax.management.MBeanServer; import javax.management.ObjectName; @@ -49,8 +34,11 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Multimap; import com.google.common.util.concurrent.AbstractFuture; + import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import org.apache.cassandra.locator.EndpointsByRange; +import org.apache.cassandra.locator.EndpointsForRange; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,12 +56,10 @@ import org.apache.cassandra.gms.EndpointState; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.gms.IFailureDetector; import org.apache.cassandra.gms.IEndpointStateChangeSubscriber; import org.apache.cassandra.gms.IFailureDetectionEventListener; -import org.apache.cassandra.gms.IFailureDetector; import org.apache.cassandra.gms.VersionedValue; -import org.apache.cassandra.locator.EndpointsByRange; -import org.apache.cassandra.locator.EndpointsForRange; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.net.IAsyncCallbackWithFailure; @@ -81,18 +67,14 @@ import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.repair.CommonRange; +import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.repair.RepairJobDesc; import org.apache.cassandra.repair.RepairParallelism; import org.apache.cassandra.repair.RepairSession; import org.apache.cassandra.repair.consistent.CoordinatorSessions; import org.apache.cassandra.repair.consistent.LocalSessions; -import org.apache.cassandra.repair.messages.PrepareMessage; -import org.apache.cassandra.repair.messages.RepairMessage; -import org.apache.cassandra.repair.messages.RepairOption; -import org.apache.cassandra.repair.messages.SyncComplete; -import org.apache.cassandra.repair.messages.ValidationComplete; +import org.apache.cassandra.repair.messages.*; import org.apache.cassandra.schema.TableId; -import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.utils.CassandraVersion; import org.apache.cassandra.utils.Clock; import org.apache.cassandra.utils.FBUtilities;