From b84ec51b4c73d7b9b58ea8a1708ae2f330972970 Mon Sep 17 00:00:00 2001 From: David Capwell Date: Fri, 12 Nov 2021 15:47:44 -0800 Subject: [PATCH] Refactor normal/preview/IR repair to standardize repair cleanup and error handling of failed RepairJobs patch by David Capwell; reviewed by Marcus Eriksson for CASSANDRA-17069 --- CHANGES.txt | 1 + checkstyle.xml | 22 +- .../apache/cassandra/cache/CaffeineCache.java | 3 +- .../apache/cassandra/cache/ChunkCache.java | 3 +- .../cassandra/cache/SerializingCache.java | 3 +- .../apache/cassandra/cql3/QueryProcessor.java | 3 +- .../db/repair/PendingAntiCompaction.java | 2 +- .../sasi/analyzer/filter/StemmerFactory.java | 3 +- .../sasi/analyzer/filter/StopWordFactory.java | 3 +- .../metrics/HintedHandoffMetrics.java | 5 +- .../metrics/HintsServiceMetrics.java | 3 +- .../cassandra/repair/AbstractRepairTask.java | 122 ++++++ .../repair/CoordinatedRepairResult.java | 108 +++++ .../repair/IncrementalRepairTask.java | 75 ++++ .../cassandra/repair/NormalRepairTask.java | 57 +++ .../cassandra/repair/PreviewRepairTask.java | 145 +++++++ .../apache/cassandra/repair/RepairJob.java | 4 +- .../cassandra/repair/RepairNotifier.java | 25 ++ .../cassandra/repair/RepairRunnable.java | 391 +++--------------- .../apache/cassandra/repair/RepairTask.java | 43 ++ .../repair/SomeRepairFailedException.java | 5 + .../repair/consistent/CoordinatorSession.java | 138 ++----- .../repair/consistent/SyncStatSummary.java | 7 +- .../cassandra/security/CipherFactory.java | 3 +- .../service/ActiveRepairService.java | 15 +- .../utils/concurrent/AbstractFuture.java | 44 +- .../utils/concurrent/AsyncFuture.java | 15 +- .../cassandra/utils/concurrent/Future.java | 30 +- .../utils/concurrent/ListenerList.java | 33 +- .../utils/concurrent/SyncFuture.java | 15 +- .../AbstractNetstatsBootstrapStreaming.java | 4 +- .../test/repair/ForceRepairTest.java | 170 ++++++++ .../consistent/CoordinatorMessagingTest.java | 36 +- .../consistent/CoordinatorSessionTest.java | 61 +-- .../concurrent/AbstractTestAsyncPromise.java | 83 ++-- 35 files changed, 1124 insertions(+), 556 deletions(-) create mode 100644 src/java/org/apache/cassandra/repair/AbstractRepairTask.java create mode 100644 src/java/org/apache/cassandra/repair/CoordinatedRepairResult.java create mode 100644 src/java/org/apache/cassandra/repair/IncrementalRepairTask.java create mode 100644 src/java/org/apache/cassandra/repair/NormalRepairTask.java create mode 100644 src/java/org/apache/cassandra/repair/PreviewRepairTask.java create mode 100644 src/java/org/apache/cassandra/repair/RepairNotifier.java create mode 100644 src/java/org/apache/cassandra/repair/RepairTask.java create mode 100644 test/distributed/org/apache/cassandra/distributed/test/repair/ForceRepairTest.java diff --git a/CHANGES.txt b/CHANGES.txt index fb97fec7cec8..12848afcc114 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.1 + * Refactor normal/preview/IR repair to standardize repair cleanup and error handling of failed RepairJobs (CASSANDRA-17069) * Log missing peers in StartupClusterConnectivityChecker (CASSANDRA-17130) * Introduce separate rate limiting settings for entire SSTable streaming (CASSANDRA-17065) * Implement Virtual Tables for Auth Caches (CASSANDRA-16914) diff --git a/checkstyle.xml b/checkstyle.xml index 4bc31dd348be..19bfa8627f3b 100644 --- a/checkstyle.xml +++ b/checkstyle.xml @@ -51,14 +51,30 @@ - + - - + + + + + + + + + + + + + + + + + + diff --git a/src/java/org/apache/cassandra/cache/CaffeineCache.java b/src/java/org/apache/cassandra/cache/CaffeineCache.java index d51ea8403751..b01093f9ae93 100644 --- a/src/java/org/apache/cassandra/cache/CaffeineCache.java +++ b/src/java/org/apache/cassandra/cache/CaffeineCache.java @@ -27,6 +27,7 @@ import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.Policy.Eviction; import com.github.benmanes.caffeine.cache.Weigher; +import org.apache.cassandra.concurrent.ImmediateExecutor; /** * An adapter from a Caffeine cache to the ICache interface. This provides an on-heap cache using @@ -54,7 +55,7 @@ public static Caffein Cache cache = Caffeine.newBuilder() .maximumWeight(weightedCapacity) .weigher(weigher) - .executor(MoreExecutors.directExecutor()) + .executor(ImmediateExecutor.INSTANCE) .build(); return new CaffeineCache<>(cache); } diff --git a/src/java/org/apache/cassandra/cache/ChunkCache.java b/src/java/org/apache/cassandra/cache/ChunkCache.java index c53810ac4806..397850ae30c3 100644 --- a/src/java/org/apache/cassandra/cache/ChunkCache.java +++ b/src/java/org/apache/cassandra/cache/ChunkCache.java @@ -29,6 +29,7 @@ import com.google.common.util.concurrent.MoreExecutors; import com.github.benmanes.caffeine.cache.*; +import org.apache.cassandra.concurrent.ImmediateExecutor; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.sstable.CorruptSSTableException; import org.apache.cassandra.io.util.*; @@ -143,7 +144,7 @@ private ChunkCache(BufferPool pool) metrics = new ChunkCacheMetrics(this); cache = Caffeine.newBuilder() .maximumWeight(cacheSize) - .executor(MoreExecutors.directExecutor()) + .executor(ImmediateExecutor.INSTANCE) .weigher((key, buffer) -> ((Buffer) buffer).buffer.capacity()) .removalListener(this) .recordStats(() -> metrics) diff --git a/src/java/org/apache/cassandra/cache/SerializingCache.java b/src/java/org/apache/cassandra/cache/SerializingCache.java index 55c20ec4d728..cd028e2779bf 100644 --- a/src/java/org/apache/cassandra/cache/SerializingCache.java +++ b/src/java/org/apache/cassandra/cache/SerializingCache.java @@ -21,6 +21,7 @@ import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.Weigher; +import org.apache.cassandra.concurrent.ImmediateExecutor; import org.apache.cassandra.io.ISerializer; import org.apache.cassandra.io.util.MemoryInputStream; import org.apache.cassandra.io.util.MemoryOutputStream; @@ -51,7 +52,7 @@ private SerializingCache(long capacity, Weigher weigher, IS this.cache = Caffeine.newBuilder() .weigher(weigher) .maximumWeight(capacity) - .executor(MoreExecutors.directExecutor()) + .executor(ImmediateExecutor.INSTANCE) .removalListener((key, mem, cause) -> { if (cause.wasEvicted()) { mem.unreference(); diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java index eea83363d279..ed998618c53c 100644 --- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java +++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java @@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory; import org.antlr.runtime.*; +import org.apache.cassandra.concurrent.ImmediateExecutor; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.metrics.ClientRequestMetrics; @@ -97,7 +98,7 @@ public class QueryProcessor implements QueryHandler static { preparedStatements = Caffeine.newBuilder() - .executor(MoreExecutors.directExecutor()) + .executor(ImmediateExecutor.INSTANCE) .maximumWeight(capacityToBytes(DatabaseDescriptor.getPreparedStatementsCacheSizeMB())) .weigher(QueryProcessor::measure) .removalListener((key, prepared, cause) -> { diff --git a/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java b/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java index 59eff556c576..0007bbe6b32f 100644 --- a/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java +++ b/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java @@ -372,7 +372,7 @@ public Future> run() } Future> acquisitionResults = FutureCombiner.successfulOf(tasks); - return acquisitionResults.andThenAsync(getAcquisitionCallback(prsId, tokenRanges)); + return acquisitionResults.flatMap(getAcquisitionCallback(prsId, tokenRanges)); } @VisibleForTesting diff --git a/src/java/org/apache/cassandra/index/sasi/analyzer/filter/StemmerFactory.java b/src/java/org/apache/cassandra/index/sasi/analyzer/filter/StemmerFactory.java index d278c28e8acb..17f151211e93 100644 --- a/src/java/org/apache/cassandra/index/sasi/analyzer/filter/StemmerFactory.java +++ b/src/java/org/apache/cassandra/index/sasi/analyzer/filter/StemmerFactory.java @@ -24,6 +24,7 @@ import com.google.common.util.concurrent.MoreExecutors; +import org.apache.cassandra.concurrent.ImmediateExecutor; import org.tartarus.snowball.SnowballStemmer; import org.tartarus.snowball.ext.*; @@ -42,7 +43,7 @@ public class StemmerFactory { private static final Logger logger = LoggerFactory.getLogger(StemmerFactory.class); private static final LoadingCache> STEMMER_CONSTRUCTOR_CACHE = Caffeine.newBuilder() - .executor(MoreExecutors.directExecutor()) + .executor(ImmediateExecutor.INSTANCE) .build(new CacheLoader>() { public Constructor load(Class aClass) throws Exception diff --git a/src/java/org/apache/cassandra/index/sasi/analyzer/filter/StopWordFactory.java b/src/java/org/apache/cassandra/index/sasi/analyzer/filter/StopWordFactory.java index 56c07f310759..434fbda0a64b 100644 --- a/src/java/org/apache/cassandra/index/sasi/analyzer/filter/StopWordFactory.java +++ b/src/java/org/apache/cassandra/index/sasi/analyzer/filter/StopWordFactory.java @@ -32,6 +32,7 @@ import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; +import org.apache.cassandra.concurrent.ImmediateExecutor; import org.apache.cassandra.io.util.File; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,7 +52,7 @@ public class StopWordFactory "pl","pt","ro","ru","sv")); private static final LoadingCache> STOP_WORDS_CACHE = Caffeine.newBuilder() - .executor(MoreExecutors.directExecutor()) + .executor(ImmediateExecutor.INSTANCE) .build(StopWordFactory::getStopWordsFromResource); public static Set getStopWordsForLanguage(Locale locale) diff --git a/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java b/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java index 0261e8ed5282..2a5fa9a1314b 100644 --- a/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java +++ b/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java @@ -25,6 +25,7 @@ import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; +import org.apache.cassandra.concurrent.ImmediateExecutor; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.utils.UUIDGen; @@ -44,12 +45,12 @@ public class HintedHandoffMetrics /** Total number of hints which are not stored, This is not a cache. */ private final LoadingCache notStored = Caffeine.newBuilder() - .executor(MoreExecutors.directExecutor()) + .executor(ImmediateExecutor.INSTANCE) .build(DifferencingCounter::new); /** Total number of hints that have been created, This is not a cache. */ private final LoadingCache createdHintCounts = Caffeine.newBuilder() - .executor(MoreExecutors.directExecutor()) + .executor(ImmediateExecutor.INSTANCE) .build(address -> Metrics.counter(factory.createMetricName("Hints_created-" + address.toString().replace(':', '.')))); public void incrCreatedHints(InetAddressAndPort address) diff --git a/src/java/org/apache/cassandra/metrics/HintsServiceMetrics.java b/src/java/org/apache/cassandra/metrics/HintsServiceMetrics.java index 424f5025f9eb..50defd92c50c 100644 --- a/src/java/org/apache/cassandra/metrics/HintsServiceMetrics.java +++ b/src/java/org/apache/cassandra/metrics/HintsServiceMetrics.java @@ -25,6 +25,7 @@ import com.codahale.metrics.Meter; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; +import org.apache.cassandra.concurrent.ImmediateExecutor; import org.apache.cassandra.locator.InetAddressAndPort; import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; @@ -47,7 +48,7 @@ public final class HintsServiceMetrics /** Histograms per-endpoint of hint delivery delays, This is not a cache. */ private static final LoadingCache delayByEndpoint = Caffeine.newBuilder() - .executor(MoreExecutors.directExecutor()) + .executor(ImmediateExecutor.INSTANCE) .build(address -> Metrics.histogram(factory.createMetricName("Hint_delays-"+address.toString().replace(':', '.')), false)); public static void updateDelayMetrics(InetAddressAndPort endpoint, long delay) diff --git a/src/java/org/apache/cassandra/repair/AbstractRepairTask.java b/src/java/org/apache/cassandra/repair/AbstractRepairTask.java new file mode 100644 index 000000000000..c729cdacfe13 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/AbstractRepairTask.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.UUID; + +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.FutureCallback; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.ExecutorPlus; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.repair.messages.RepairOption; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.utils.concurrent.Future; +import org.apache.cassandra.utils.concurrent.FutureCombiner; + +public abstract class AbstractRepairTask implements RepairTask +{ + protected static final Logger logger = LoggerFactory.getLogger(AbstractRepairTask.class); + + protected final RepairOption options; + protected final String keyspace; + protected final RepairNotifier notifier; + + protected AbstractRepairTask(RepairOption options, String keyspace, RepairNotifier notifier) + { + this.options = Objects.requireNonNull(options); + this.keyspace = Objects.requireNonNull(keyspace); + this.notifier = Objects.requireNonNull(notifier); + } + + private List submitRepairSessions(UUID parentSession, + boolean isIncremental, + ExecutorPlus executor, + List commonRanges, + String... cfnames) + { + List futures = new ArrayList<>(options.getRanges().size()); + + for (CommonRange commonRange : commonRanges) + { + logger.info("Starting RepairSession for {}", commonRange); + RepairSession session = ActiveRepairService.instance.submitRepairSession(parentSession, + commonRange, + keyspace, + options.getParallelism(), + isIncremental, + options.isPullRepair(), + options.getPreviewKind(), + options.optimiseStreams(), + executor, + cfnames); + if (session == null) + continue; + session.addCallback(new RepairSessionCallback(session)); + futures.add(session); + } + return futures; + } + + protected Future runRepair(UUID parentSession, + boolean isIncremental, + ExecutorPlus executor, + List commonRanges, + String... cfnames) + { + List allSessions = submitRepairSessions(parentSession, isIncremental, executor, commonRanges, cfnames); + List>> ranges = Lists.transform(allSessions, RepairSession::ranges); + Future> f = FutureCombiner.successfulOf(allSessions); + return f.map(results -> { + logger.debug("Repair result: {}", results); + return CoordinatedRepairResult.create(ranges, results); + }); + } + + private class RepairSessionCallback implements FutureCallback + { + private final RepairSession session; + + public RepairSessionCallback(RepairSession session) + { + this.session = session; + } + + public void onSuccess(RepairSessionResult result) + { + String message = String.format("Repair session %s for range %s finished", session.getId(), + session.ranges().toString()); + notifier.notifyProgress(message); + } + + public void onFailure(Throwable t) + { + String message = String.format("Repair session %s for range %s failed with error %s", + session.getId(), session.ranges().toString(), t.getMessage()); + notifier.notifyError(new RuntimeException(message, t)); + } + } +} diff --git a/src/java/org/apache/cassandra/repair/CoordinatedRepairResult.java b/src/java/org/apache/cassandra/repair/CoordinatedRepairResult.java new file mode 100644 index 000000000000..9593acc77489 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/CoordinatedRepairResult.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +import javax.annotation.Nullable; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; + +public class CoordinatedRepairResult +{ + public final Collection> successfulRanges; + public final Collection> failedRanges; + public final Collection> skippedRanges; + public final Optional> results; + + public CoordinatedRepairResult(Collection> successfulRanges, + Collection> failedRanges, + Collection> skippedRanges, + List results) + { + this.successfulRanges = successfulRanges != null ? ImmutableList.copyOf(successfulRanges) : Collections.emptyList(); + this.failedRanges = failedRanges != null ? ImmutableList.copyOf(failedRanges) : Collections.emptyList(); + this.skippedRanges = skippedRanges != null ? ImmutableList.copyOf(skippedRanges) : Collections.emptyList(); + this.results = Optional.ofNullable(results); + } + + public static CoordinatedRepairResult create(List>> ranges, List results) + { + if (results == null || results.isEmpty()) + // something went wrong; assume all sessions failed + return failed(ranges); + + assert ranges.size() == results.size() : String.format("range size %d != results size %d;ranges: %s, results: %s", ranges.size(), results.size(), ranges, results); + Collection> successfulRanges = new ArrayList<>(); + Collection> failedRanges = new ArrayList<>(); + Collection> skippedRanges = new ArrayList<>(); + int index = 0; + for (RepairSessionResult sessionResult : results) + { + if (sessionResult != null) + { + // don't record successful repair if we had to skip ranges + Collection> replicas = sessionResult.skippedReplicas ? skippedRanges : successfulRanges; + replicas.addAll(sessionResult.ranges); + } + else + { + // FutureCombiner.successfulOf doesn't keep track of the original, but maintains order, so + // can fetch the original session + failedRanges.addAll(Objects.requireNonNull(ranges.get(index))); + } + index++; + } + return new CoordinatedRepairResult(successfulRanges, failedRanges, skippedRanges, results); + } + + private static CoordinatedRepairResult failed(@Nullable List>> ranges) + { + Collection> failedRanges = new ArrayList<>(ranges == null ? 0 : ranges.size()); + if (ranges != null) + ranges.forEach(failedRanges::addAll); + return new CoordinatedRepairResult(null, failedRanges, null, null); + } + + /** + * Utility method for tests to produce a success result; should only be used by tests as syntaxtic sugar as all + * results must be present else an error is thrown. + */ + @VisibleForTesting + public static CoordinatedRepairResult success(List results) + { + assert results != null && results.stream().allMatch(a -> a != null) : String.format("results was null or had a null (failed) result: %s", results); + List>> ranges = Lists.transform(results, a -> a.ranges); + return create(ranges, results); + } + + public boolean hasFailed() + { + return !failedRanges.isEmpty(); + } +} diff --git a/src/java/org/apache/cassandra/repair/IncrementalRepairTask.java b/src/java/org/apache/cassandra/repair/IncrementalRepairTask.java new file mode 100644 index 000000000000..c2951d5bf80d --- /dev/null +++ b/src/java/org/apache/cassandra/repair/IncrementalRepairTask.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair; + +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import com.google.common.collect.ImmutableSet; + +import org.apache.cassandra.concurrent.ExecutorPlus; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.repair.consistent.CoordinatorSession; +import org.apache.cassandra.repair.messages.RepairOption; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.concurrent.Future; + +public class IncrementalRepairTask extends AbstractRepairTask +{ + private final UUID parentSession; + private final RepairRunnable.NeighborsAndRanges neighborsAndRanges; + private final String[] cfnames; + + protected IncrementalRepairTask(RepairOption options, + String keyspace, + RepairNotifier notifier, + UUID parentSession, + RepairRunnable.NeighborsAndRanges neighborsAndRanges, + String[] cfnames) + { + super(options, keyspace, notifier); + this.parentSession = parentSession; + this.neighborsAndRanges = neighborsAndRanges; + this.cfnames = cfnames; + } + + @Override + public String name() + { + return "Repair"; + } + + @Override + public Future performUnsafe(ExecutorPlus executor) throws Exception + { + // the local node also needs to be included in the set of participants, since coordinator sessions aren't persisted + Set allParticipants = ImmutableSet.builder() + .addAll(neighborsAndRanges.participants) + .add(FBUtilities.getBroadcastAddressAndPort()) + .build(); + // Not necessary to include self for filtering. The common ranges only contains neighbhor node endpoints. + List allRanges = neighborsAndRanges.filterCommonRanges(keyspace, cfnames); + + CoordinatorSession coordinatorSession = ActiveRepairService.instance.consistent.coordinated.registerSession(parentSession, allParticipants, neighborsAndRanges.shouldExcludeDeadParticipants); + + return coordinatorSession.execute(() -> runRepair(parentSession, true, executor, allRanges, cfnames)); + + } +} diff --git a/src/java/org/apache/cassandra/repair/NormalRepairTask.java b/src/java/org/apache/cassandra/repair/NormalRepairTask.java new file mode 100644 index 000000000000..532271ca2960 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/NormalRepairTask.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair; + +import java.util.List; +import java.util.UUID; + +import org.apache.cassandra.concurrent.ExecutorPlus; +import org.apache.cassandra.repair.messages.RepairOption; +import org.apache.cassandra.utils.concurrent.Future; + +public class NormalRepairTask extends AbstractRepairTask +{ + private final UUID parentSession; + private final List commonRanges; + private final String[] cfnames; + + protected NormalRepairTask(RepairOption options, + String keyspace, + RepairNotifier notifier, + UUID parentSession, + List commonRanges, + String[] cfnames) + { + super(options, keyspace, notifier); + this.parentSession = parentSession; + this.commonRanges = commonRanges; + this.cfnames = cfnames; + } + + @Override + public String name() + { + return "Repair"; + } + + @Override + public Future performUnsafe(ExecutorPlus executor) + { + return runRepair(parentSession, false, executor, commonRanges, cfnames); + } +} diff --git a/src/java/org/apache/cassandra/repair/PreviewRepairTask.java b/src/java/org/apache/cassandra/repair/PreviewRepairTask.java new file mode 100644 index 000000000000..e2eb08cc52d6 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/PreviewRepairTask.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import com.google.common.base.Preconditions; + +import org.apache.cassandra.concurrent.ExecutorPlus; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.metrics.RepairMetrics; +import org.apache.cassandra.repair.consistent.SyncStatSummary; +import org.apache.cassandra.repair.messages.RepairOption; +import org.apache.cassandra.streaming.PreviewKind; +import org.apache.cassandra.utils.DiagnosticSnapshotService; +import org.apache.cassandra.utils.concurrent.Future; + +public class PreviewRepairTask extends AbstractRepairTask +{ + private final UUID parentSession; + private final List commonRanges; + private final String[] cfnames; + + protected PreviewRepairTask(RepairOption options, String keyspace, RepairNotifier notifier, UUID parentSession, List commonRanges, String[] cfnames) + { + super(options, keyspace, notifier); + this.parentSession = parentSession; + this.commonRanges = commonRanges; + this.cfnames = cfnames; + } + + @Override + public String name() + { + return "Repair preview"; + } + + @Override + public Future performUnsafe(ExecutorPlus executor) + { + Future f = runRepair(parentSession, false, executor, commonRanges, cfnames); + return f.map(result -> { + if (result.hasFailed()) + return result; + + PreviewKind previewKind = options.getPreviewKind(); + Preconditions.checkState(previewKind != PreviewKind.NONE, "Preview is NONE"); + SyncStatSummary summary = new SyncStatSummary(true); + summary.consumeSessionResults(result.results); + + final String message; + if (summary.isEmpty()) + { + message = previewKind == PreviewKind.REPAIRED ? "Repaired data is in sync" : "Previewed data was in sync"; + } + else + { + message = (previewKind == PreviewKind.REPAIRED ? "Repaired data is inconsistent\n" : "Preview complete\n") + summary; + RepairMetrics.previewFailures.inc(); + if (previewKind == PreviewKind.REPAIRED) + maybeSnapshotReplicas(parentSession, keyspace, result.results.get()); // we know its present as summary used it + } + notifier.notification(message); + + return result; + }); + } + + private void maybeSnapshotReplicas(UUID parentSession, String keyspace, List results) + { + if (!DatabaseDescriptor.snapshotOnRepairedDataMismatch()) + return; + + try + { + Set mismatchingTables = new HashSet<>(); + Set nodes = new HashSet<>(); + for (RepairSessionResult sessionResult : results) + { + for (RepairResult repairResult : emptyIfNull(sessionResult.repairJobResults)) + { + for (SyncStat stat : emptyIfNull(repairResult.stats)) + { + if (stat.numberOfDifferences > 0) + mismatchingTables.add(repairResult.desc.columnFamily); + // snapshot all replicas, even if they don't have any differences + nodes.add(stat.nodes.coordinator); + nodes.add(stat.nodes.peer); + } + } + } + + String snapshotName = DiagnosticSnapshotService.getSnapshotName(DiagnosticSnapshotService.REPAIRED_DATA_MISMATCH_SNAPSHOT_PREFIX); + for (String table : mismatchingTables) + { + // we can just check snapshot existence locally since the repair coordinator is always a replica (unlike in the read case) + if (!Keyspace.open(keyspace).getColumnFamilyStore(table).snapshotExists(snapshotName)) + { + logger.info("{} Snapshotting {}.{} for preview repair mismatch with tag {} on instances {}", + options.getPreviewKind().logPrefix(parentSession), + keyspace, table, snapshotName, nodes); + DiagnosticSnapshotService.repairedDataMismatch(Keyspace.open(keyspace).getColumnFamilyStore(table).metadata(), nodes); + } + else + { + logger.info("{} Not snapshotting {}.{} - snapshot {} exists", + options.getPreviewKind().logPrefix(parentSession), + keyspace, table, snapshotName); + } + } + } + catch (Exception e) + { + logger.error("{} Failed snapshotting replicas", options.getPreviewKind().logPrefix(parentSession), e); + } + } + + private static Iterable emptyIfNull(Iterable iter) + { + if (iter == null) + return Collections.emptyList(); + return iter; + } +} diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java index a57ca84b319c..c78cc73c0c94 100644 --- a/src/java/org/apache/cassandra/repair/RepairJob.java +++ b/src/java/org/apache/cassandra/repair/RepairJob.java @@ -135,7 +135,7 @@ public void run() } // When all snapshot complete, send validation requests - treeResponses = allSnapshotTasks.andThenAsync(endpoints -> { + treeResponses = allSnapshotTasks.flatMap(endpoints -> { if (parallelismDegree == RepairParallelism.SEQUENTIAL) return sendSequentialValidationRequest(endpoints); else @@ -149,7 +149,7 @@ public void run() } // When all validations complete, submit sync tasks - Future> syncResults = treeResponses.andThenAsync(session.optimiseStreams && !session.pullRepair ? this::optimisedSyncing : this::standardSyncing, taskExecutor); + Future> syncResults = treeResponses.flatMap(session.optimiseStreams && !session.pullRepair ? this::optimisedSyncing : this::standardSyncing, taskExecutor); // When all sync complete, set the final result syncResults.addCallback(new FutureCallback>() diff --git a/src/java/org/apache/cassandra/repair/RepairNotifier.java b/src/java/org/apache/cassandra/repair/RepairNotifier.java new file mode 100644 index 000000000000..977bc4ee4d5e --- /dev/null +++ b/src/java/org/apache/cassandra/repair/RepairNotifier.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair; + +public interface RepairNotifier +{ + void notification(String message); + void notifyError(Throwable error); + void notifyProgress(String message); +} diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java index 30244a740574..cc9c77072189 100644 --- a/src/java/org/apache/cassandra/repair/RepairRunnable.java +++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java @@ -19,16 +19,24 @@ import java.io.IOException; import java.nio.ByteBuffer; -import java.util.*; -import java.util.concurrent.ExecutorService; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import com.google.common.base.Preconditions; -import com.google.common.collect.*; -import com.google.common.util.concurrent.FutureCallback; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import org.apache.commons.lang3.time.DurationFormatUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,10 +58,7 @@ import org.apache.cassandra.locator.EndpointsForRange; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.Replica; -import org.apache.cassandra.metrics.RepairMetrics; import org.apache.cassandra.metrics.StorageMetrics; -import org.apache.cassandra.repair.consistent.CoordinatorSession; -import org.apache.cassandra.repair.consistent.SyncStatSummary; import org.apache.cassandra.repair.messages.RepairOption; import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.schema.SystemDistributedKeyspace; @@ -61,20 +66,16 @@ import org.apache.cassandra.service.ActiveRepairService.ParentRepairStatus; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.tracing.TraceKeyspace; import org.apache.cassandra.tracing.TraceState; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.transport.messages.ResultMessage; import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.DiagnosticSnapshotService; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Throwables; import org.apache.cassandra.utils.UUIDGen; import org.apache.cassandra.utils.WrappedRunnable; import org.apache.cassandra.utils.concurrent.Future; -import org.apache.cassandra.utils.concurrent.FutureCombiner; -import org.apache.cassandra.utils.concurrent.ImmediateFuture; import org.apache.cassandra.utils.progress.ProgressEvent; import org.apache.cassandra.utils.progress.ProgressEventNotifier; import org.apache.cassandra.utils.progress.ProgressEventType; @@ -85,7 +86,7 @@ import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis; import static org.apache.cassandra.utils.Clock.Global.nanoTime; -public class RepairRunnable implements Runnable, ProgressEventNotifier +public class RepairRunnable implements Runnable, ProgressEventNotifier, RepairNotifier { private static final Logger logger = LoggerFactory.getLogger(RepairRunnable.class); @@ -141,26 +142,14 @@ protected void fireProgressEvent(ProgressEvent event) } } + @Override public void notification(String msg) { logger.info(msg); fireProgressEvent(new ProgressEvent(ProgressEventType.NOTIFICATION, progressCounter.get(), totalProgress, msg)); } - private void skip(String msg) - { - notification("Repair " + parentSession + " skipped: " + msg); - success(msg); - } - - private void success(String msg) - { - fireProgressEvent(new ProgressEvent(ProgressEventType.SUCCESS, progressCounter.get(), totalProgress, msg)); - ActiveRepairService.instance.recordRepairStatus(cmd, ActiveRepairService.ParentRepairStatus.COMPLETED, - ImmutableList.of(msg)); - complete(null); - } - + @Override public void notifyError(Throwable error) { // exception should be ignored @@ -187,6 +176,30 @@ public void notifyError(Throwable error) maybeStoreParentRepairFailure(error); } + @Override + public void notifyProgress(String message) + { + logger.info(message); + fireProgressEvent(new ProgressEvent(ProgressEventType.PROGRESS, + progressCounter.incrementAndGet(), + totalProgress, + message)); + } + + private void skip(String msg) + { + notification("Repair " + parentSession + " skipped: " + msg); + success(msg); + } + + private void success(String msg) + { + fireProgressEvent(new ProgressEvent(ProgressEventType.SUCCESS, progressCounter.get(), totalProgress, msg)); + ActiveRepairService.instance.recordRepairStatus(cmd, ActiveRepairService.ParentRepairStatus.COMPLETED, + ImmutableList.of(msg)); + complete(null); + } + private void fail(String reason) { if (reason == null) @@ -397,264 +410,51 @@ private void prepare(List columnFamilies, Set commonRanges, - Set preparedEndpoints, - String... cfnames) - { - - // Set up RepairJob executor for this repair command. ExecutorPlus executor = createExecutor(); - - // Setting the repairedAt time to UNREPAIRED_SSTABLE causes the repairedAt times to be preserved across streamed sstables - final Future> allSessions = submitRepairSessions(parentSession, false, executor, commonRanges, cfnames); - - // After all repair sessions completes(successful or not), - // run anticompaction if necessary and send finish notice back to client - final Collection> successfulRanges = new ArrayList<>(); - final AtomicBoolean hasFailure = new AtomicBoolean(); - allSessions.andThenAsync(results -> { - logger.debug("Repair result: {}", results); - // filter out null(=failed) results and get successful ranges - for (RepairSessionResult sessionResult : results) + Future f = task.perform(executor); + f.addCallback((result, failure) -> { + try { - if (sessionResult != null) + if (failure != null) { - // don't record successful repair if we had to skip ranges - if (!sessionResult.skippedReplicas) - { - successfulRanges.addAll(sessionResult.ranges); - } + notifyError(failure); + fail(failure.getMessage()); } else { - hasFailure.compareAndSet(false, true); - } - } - return ImmediateFuture.success(null); - }).addCallback(new RepairCompleteCallback(parentSession, - successfulRanges, - preparedEndpoints, - traceState, - hasFailure, - executor)); - } - - private void incrementalRepair(UUID parentSession, - TraceState traceState, - NeighborsAndRanges neighborsAndRanges, - Set preparedEndpoints, - String... cfnames) - { - // the local node also needs to be included in the set of participants, since coordinator sessions aren't persisted - Set allParticipants = ImmutableSet.builder() - .addAll(neighborsAndRanges.participants) - .add(FBUtilities.getBroadcastAddressAndPort()) - .build(); - // Not necessary to include self for filtering. The common ranges only contains neighbhor node endpoints. - List allRanges = neighborsAndRanges.filterCommonRanges(keyspace, cfnames); - - CoordinatorSession coordinatorSession; - try - { - coordinatorSession = ActiveRepairService.instance.consistent.coordinated.registerSession(parentSession, allParticipants, neighborsAndRanges.shouldExcludeDeadParticipants); - } - catch (NoSuchRepairSessionException e) - { - logger.warn("Aborting repair session: "+e.getMessage()); - fail(e.getMessage()); - return; - } - ExecutorPlus executor = createExecutor(); - AtomicBoolean hasFailure = new AtomicBoolean(false); - Future repairResult = coordinatorSession.execute(() -> submitRepairSessions(parentSession, true, executor, allRanges, cfnames), - hasFailure); - Collection> ranges = new HashSet<>(); - for (Collection> range : Iterables.transform(allRanges, cr -> cr.ranges)) - { - ranges.addAll(range); - } - repairResult.addCallback(new RepairCompleteCallback(parentSession, ranges, preparedEndpoints, traceState, hasFailure, executor)); - } - - private void previewRepair(UUID parentSession, - List commonRanges, - Set preparedEndpoints, - String... cfnames) - { - - logger.debug("Starting preview repair for {}", parentSession); - // Set up RepairJob executor for this repair command. - ExecutorPlus executor = createExecutor(); - - final Future> allSessions = submitRepairSessions(parentSession, false, executor, commonRanges, cfnames); - - allSessions.addCallback(new FutureCallback>() - { - public void onSuccess(List results) - { - try - { - if (results == null || results.stream().anyMatch(s -> s == null)) + maybeStoreParentRepairSuccess(result.successfulRanges); + if (result.hasFailed()) { - // something failed fail(null); - return; - } - PreviewKind previewKind = options.getPreviewKind(); - Preconditions.checkState(previewKind != PreviewKind.NONE, "Preview is NONE"); - SyncStatSummary summary = new SyncStatSummary(true); - summary.consumeSessionResults(results); - - final String message; - if (summary.isEmpty()) - { - message = previewKind == PreviewKind.REPAIRED ? "Repaired data is in sync" : "Previewed data was in sync"; } else { - message = (previewKind == PreviewKind.REPAIRED ? "Repaired data is inconsistent\n" : "Preview complete\n") + summary; - RepairMetrics.previewFailures.inc(); - if (previewKind == PreviewKind.REPAIRED) - maybeSnapshotReplicas(parentSession, keyspace, results); + success(task.name() + " completed successfully"); + ActiveRepairService.instance.cleanUp(parentSession, neighborsAndRanges.participants); } - notification(message); - - success("Repair preview completed successfully"); - ActiveRepairService.instance.cleanUp(parentSession, preparedEndpoints); - } - catch (Throwable t) - { - logger.error("Error completing preview repair", t); - onFailure(t); - } - finally - { - executor.shutdownNow(); } } - - public void onFailure(Throwable t) + finally { - notifyError(t); - fail("Error completing preview repair: " + t.getMessage()); executor.shutdownNow(); } }); } - private void maybeSnapshotReplicas(UUID parentSession, String keyspace, List results) - { - if (!DatabaseDescriptor.snapshotOnRepairedDataMismatch()) - return; - - try - { - Set mismatchingTables = new HashSet<>(); - Set nodes = new HashSet<>(); - for (RepairSessionResult sessionResult : results) - { - for (RepairResult repairResult : emptyIfNull(sessionResult.repairJobResults)) - { - for (SyncStat stat : emptyIfNull(repairResult.stats)) - { - if (stat.numberOfDifferences > 0) - mismatchingTables.add(repairResult.desc.columnFamily); - // snapshot all replicas, even if they don't have any differences - nodes.add(stat.nodes.coordinator); - nodes.add(stat.nodes.peer); - } - } - } - - String snapshotName = DiagnosticSnapshotService.getSnapshotName(DiagnosticSnapshotService.REPAIRED_DATA_MISMATCH_SNAPSHOT_PREFIX); - for (String table : mismatchingTables) - { - // we can just check snapshot existence locally since the repair coordinator is always a replica (unlike in the read case) - if (!Keyspace.open(keyspace).getColumnFamilyStore(table).snapshotExists(snapshotName)) - { - logger.info("{} Snapshotting {}.{} for preview repair mismatch with tag {} on instances {}", - options.getPreviewKind().logPrefix(parentSession), - keyspace, table, snapshotName, nodes); - DiagnosticSnapshotService.repairedDataMismatch(Keyspace.open(keyspace).getColumnFamilyStore(table).metadata(), nodes); - } - else - { - logger.info("{} Not snapshotting {}.{} - snapshot {} exists", - options.getPreviewKind().logPrefix(parentSession), - keyspace, table, snapshotName); - } - } - } - catch (Exception e) - { - logger.error("{} Failed snapshotting replicas", options.getPreviewKind().logPrefix(parentSession), e); - } - } - - private static Iterable emptyIfNull(Iterable iter) - { - if (iter == null) - return Collections.emptyList(); - return iter; - } - - private Future> submitRepairSessions(UUID parentSession, - boolean isIncremental, - ExecutorPlus executor, - List commonRanges, - String... cfnames) - { - List> futures = new ArrayList<>(options.getRanges().size()); - - for (CommonRange commonRange : commonRanges) - { - logger.info("Starting RepairSession for {}", commonRange); - RepairSession session = ActiveRepairService.instance.submitRepairSession(parentSession, - commonRange, - keyspace, - options.getParallelism(), - isIncremental, - options.isPullRepair(), - options.getPreviewKind(), - options.optimiseStreams(), - executor, - cfnames); - if (session == null) - continue; - session.addCallback(new RepairSessionCallback(session)); - futures.add(session); - } - return FutureCombiner.successfulOf(futures); - } - private ExecutorPlus createExecutor() { return executorFactory() @@ -663,81 +463,6 @@ private ExecutorPlus createExecutor() .pooled("Repair#" + cmd, options.getJobThreads()); } - private class RepairSessionCallback implements FutureCallback - { - private final RepairSession session; - - public RepairSessionCallback(RepairSession session) - { - this.session = session; - } - - public void onSuccess(RepairSessionResult result) - { - String message = String.format("Repair session %s for range %s finished", session.getId(), - session.ranges().toString()); - logger.info(message); - fireProgressEvent(new ProgressEvent(ProgressEventType.PROGRESS, - progressCounter.incrementAndGet(), - totalProgress, - message)); - } - - public void onFailure(Throwable t) - { - String message = String.format("Repair session %s for range %s failed with error %s", - session.getId(), session.ranges().toString(), t.getMessage()); - notifyError(new RuntimeException(message, t)); - } - } - - private class RepairCompleteCallback implements FutureCallback - { - final UUID parentSession; - final Collection> successfulRanges; - final Set preparedEndpoints; - final TraceState traceState; - final AtomicBoolean hasFailure; - final ExecutorService executor; - - public RepairCompleteCallback(UUID parentSession, - Collection> successfulRanges, - Set preparedEndpoints, - TraceState traceState, - AtomicBoolean hasFailure, - ExecutorService executor) - { - this.parentSession = parentSession; - this.successfulRanges = successfulRanges; - this.preparedEndpoints = preparedEndpoints; - this.traceState = traceState; - this.hasFailure = hasFailure; - this.executor = executor; - } - - public void onSuccess(Object result) - { - maybeStoreParentRepairSuccess(successfulRanges); - if (hasFailure.get()) - { - fail(null); - } - else - { - success("Repair completed successfully"); - ActiveRepairService.instance.cleanUp(parentSession, preparedEndpoints); - } - executor.shutdownNow(); - } - - public void onFailure(Throwable t) - { - notifyError(t); - fail(t.getMessage()); - executor.shutdownNow(); - } - } - private static void addRangeToNeighbors(List neighborRangeList, Range range, EndpointsForRange neighbors) { Set endpoints = neighbors.endpoints(); @@ -842,9 +567,9 @@ private static final class SkipRepairException extends RuntimeException static final class NeighborsAndRanges { - private final boolean shouldExcludeDeadParticipants; - private final Set participants; - private final List commonRanges; + final boolean shouldExcludeDeadParticipants; + final Set participants; + final List commonRanges; NeighborsAndRanges(boolean shouldExcludeDeadParticipants, Set participants, List commonRanges) { diff --git a/src/java/org/apache/cassandra/repair/RepairTask.java b/src/java/org/apache/cassandra/repair/RepairTask.java new file mode 100644 index 000000000000..12d65c10e461 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/RepairTask.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair; + +import org.apache.cassandra.concurrent.ExecutorPlus; +import org.apache.cassandra.utils.JVMStabilityInspector; +import org.apache.cassandra.utils.concurrent.Future; +import org.apache.cassandra.utils.concurrent.ImmediateFuture; + +public interface RepairTask +{ + String name(); + + Future performUnsafe(ExecutorPlus executor) throws Exception; + + default Future perform(ExecutorPlus executor) + { + try + { + return performUnsafe(executor); + } + catch (Exception | Error e) + { + JVMStabilityInspector.inspectThrowable(e); + return ImmediateFuture.failure(e); + } + } +} diff --git a/src/java/org/apache/cassandra/repair/SomeRepairFailedException.java b/src/java/org/apache/cassandra/repair/SomeRepairFailedException.java index 4b077b836a11..13ee0ae9ed0b 100644 --- a/src/java/org/apache/cassandra/repair/SomeRepairFailedException.java +++ b/src/java/org/apache/cassandra/repair/SomeRepairFailedException.java @@ -28,4 +28,9 @@ public class SomeRepairFailedException extends RuntimeException { public static final SomeRepairFailedException INSTANCE = new SomeRepairFailedException(); + + private SomeRepairFailedException() + { + super(null, null, false, false); + } } diff --git a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java index 1027d0aede80..24e24faed1cc 100644 --- a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java +++ b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java @@ -19,21 +19,18 @@ package org.apache.cassandra.repair.consistent; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; -import javax.annotation.Nullable; - import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; -import com.google.common.util.concurrent.FutureCallback; + +import org.apache.cassandra.concurrent.ImmediateExecutor; +import org.apache.cassandra.repair.CoordinatedRepairResult; import org.apache.cassandra.utils.concurrent.AsyncPromise; import org.apache.cassandra.utils.concurrent.Future; -import org.apache.cassandra.utils.concurrent.ImmediateFuture; -import org.apache.cassandra.utils.concurrent.Promise; + import org.apache.commons.lang3.time.DurationFormatUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,7 +40,6 @@ import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.Verb; -import org.apache.cassandra.repair.RepairSessionResult; import org.apache.cassandra.repair.SomeRepairFailedException; import org.apache.cassandra.repair.messages.FailSession; import org.apache.cassandra.repair.messages.FinalizeCommit; @@ -64,8 +60,8 @@ public class CoordinatorSession extends ConsistentSession private static final Logger logger = LoggerFactory.getLogger(CoordinatorSession.class); private final Map participantStates = new HashMap<>(); - private final AsyncPromise prepareFuture = AsyncPromise.uncancellable(); - private final AsyncPromise finalizeProposeFuture = AsyncPromise.uncancellable(); + private final AsyncPromise prepareFuture = AsyncPromise.uncancellable(); + private final AsyncPromise finalizeProposeFuture = AsyncPromise.uncancellable(); private volatile long sessionStart = Long.MIN_VALUE; private volatile long repairStart = Long.MIN_VALUE; @@ -148,7 +144,7 @@ protected void sendMessage(InetAddressAndPort destination, Message prepare() + public Future prepare() { Preconditions.checkArgument(allStates(State.PREPARING)); @@ -188,12 +184,11 @@ public synchronized void handlePrepareResponse(InetAddressAndPort participant, b if (getState() == State.PREPARED) { logger.info("Incremental repair session {} successfully prepared.", sessionID); - prepareFuture.trySuccess(true); + prepareFuture.trySuccess(null); } else { fail(); - prepareFuture.trySuccess(false); } } @@ -202,7 +197,7 @@ public synchronized void setRepairing() setAll(State.REPAIRING); } - public synchronized Future finalizePropose() + public synchronized Future finalizePropose() { Preconditions.checkArgument(allStates(State.REPAIRING)); logger.info("Proposing finalization of repair session {}", sessionID); @@ -224,7 +219,6 @@ else if (!success) { logger.warn("Finalization proposal of session {} rejected by {}. Aborting session", sessionID, participant); fail(); - finalizeProposeFuture.trySuccess(false); } else { @@ -233,7 +227,7 @@ else if (!success) if (getState() == State.FINALIZE_PROMISED) { logger.info("Finalization proposal for repair session {} accepted by all participants.", sessionID); - finalizeProposeFuture.trySuccess(true); + finalizeProposeFuture.trySuccess(null); } } } @@ -287,103 +281,51 @@ private static String formatDuration(long then, long now) /** * Runs the asynchronous consistent repair session. Actual repair sessions are scheduled via a submitter to make unit testing easier */ - public Future execute(Supplier>> sessionSubmitter, AtomicBoolean hasFailure) + public Future execute(Supplier> sessionSubmitter) { logger.info("Beginning coordination of incremental repair session {}", sessionID); sessionStart = currentTimeMillis(); - Future prepareResult = prepare(); + Future prepareResult = prepare(); // run repair sessions normally - Future> repairSessionResults = prepareResult.andThenAsync(success -> - { - if (success) - { - repairStart = currentTimeMillis(); - if (logger.isDebugEnabled()) - { - logger.debug("Incremental repair {} prepare phase completed in {}", sessionID, formatDuration(sessionStart, repairStart)); - } - setRepairing(); - return sessionSubmitter.get(); - } - else - { - return ImmediateFuture.success(null); - } + Future repairSessionResults = prepareResult.flatMap(ignore -> { + repairStart = currentTimeMillis(); + if (logger.isDebugEnabled()) + logger.debug("Incremental repair {} prepare phase completed in {}", sessionID, formatDuration(sessionStart, repairStart)); + setRepairing(); + return sessionSubmitter.get(); }); - // mark propose finalization - Future proposeFuture = repairSessionResults.andThenAsync(results -> - { - if (results == null || results.isEmpty() || Iterables.any(results, r -> r == null)) + // if any session failed, then fail the future + Future onlySuccessSessionResults = repairSessionResults.map(result -> { + finalizeStart = currentTimeMillis(); + if (result.hasFailed()) { - finalizeStart = currentTimeMillis(); if (logger.isDebugEnabled()) - { logger.debug("Incremental repair {} validation/stream phase completed in {}", sessionID, formatDuration(repairStart, finalizeStart)); - } - return ImmediateFuture.failure(SomeRepairFailedException.INSTANCE); - } - else - { - return finalizePropose(); + throw SomeRepairFailedException.INSTANCE; } + return result; }); - // return execution result as set by following callback - Promise resultFuture = AsyncPromise.uncancellable(); - - // commit repaired data - proposeFuture.addCallback(new FutureCallback() - { - public void onSuccess(@Nullable Boolean result) - { - try - { - if (result != null && result) - { - if (logger.isDebugEnabled()) - { - logger.debug("Incremental repair {} finalization phase completed in {}", sessionID, formatDuration(finalizeStart, currentTimeMillis())); - } - finalizeCommit(); - if (logger.isDebugEnabled()) - { - logger.debug("Incremental repair {} phase completed in {}", sessionID, formatDuration(sessionStart, currentTimeMillis())); - } - } - else - { - hasFailure.set(true); - fail(); - } - resultFuture.trySuccess(result); - } - catch (Exception e) - { - resultFuture.tryFailure(e); - } - } - - public void onFailure(Throwable t) + // mark propose finalization and commit + Future proposeFuture = onlySuccessSessionResults.flatMap(results -> finalizePropose().map(ignore -> { + if (logger.isDebugEnabled()) + logger.debug("Incremental repair {} finalization phase completed in {}", sessionID, formatDuration(finalizeStart, currentTimeMillis())); + finalizeCommit(); + if (logger.isDebugEnabled()) + logger.debug("Incremental repair {} phase completed in {}", sessionID, formatDuration(sessionStart, currentTimeMillis())); + return results; + })); + + return proposeFuture.addCallback((ignore, failure) -> { + if (failure != null) { - try - { - if (logger.isDebugEnabled()) - { - logger.debug("Incremental repair {} phase failed in {}", sessionID, formatDuration(sessionStart, currentTimeMillis())); - } - hasFailure.set(true); - fail(); - } - finally - { - resultFuture.tryFailure(t); - } + if (logger.isDebugEnabled()) + logger.debug("Incremental repair {} phase failed in {}", sessionID, formatDuration(sessionStart, currentTimeMillis())); + fail(); } - }); - - return resultFuture; + }, ImmediateExecutor.INSTANCE); } } diff --git a/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java b/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java index b145fb664d2f..249d1a4b11c5 100644 --- a/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java +++ b/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import com.google.common.collect.Lists; @@ -179,11 +180,11 @@ public void consumeRepairResult(RepairResult result) summaries.get(cf).consumeStats(result.stats); } - public void consumeSessionResults(List results) + public void consumeSessionResults(Optional> results) { - if (results != null) + if (results.isPresent()) { - filter(results, Objects::nonNull).forEach(r -> filter(r.repairJobResults, Objects::nonNull).forEach(this::consumeRepairResult)); + filter(results.get(), Objects::nonNull).forEach(r -> filter(r.repairJobResults, Objects::nonNull).forEach(this::consumeRepairResult)); } } diff --git a/src/java/org/apache/cassandra/security/CipherFactory.java b/src/java/org/apache/cassandra/security/CipherFactory.java index 3c13629208d0..c42ebcaea5a8 100644 --- a/src/java/org/apache/cassandra/security/CipherFactory.java +++ b/src/java/org/apache/cassandra/security/CipherFactory.java @@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory; import io.netty.util.concurrent.FastThreadLocal; +import org.apache.cassandra.concurrent.ImmediateExecutor; import org.apache.cassandra.config.TransparentDataEncryptionOptions; /** @@ -81,7 +82,7 @@ public CipherFactory(TransparentDataEncryptionOptions options) cache = Caffeine.newBuilder() // by default cache is unbounded .maximumSize(64) // a value large enough that we should never even get close (so nothing gets evicted) - .executor(MoreExecutors.directExecutor()) + .executor(ImmediateExecutor.INSTANCE) .removalListener((key, value, cause) -> { // maybe reload the key? (to avoid the reload being on the user's dime) diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index 2532c8b8cb3b..fbcb745434d6 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -340,17 +340,10 @@ public RepairSession submitRepairSession(UUID parentRepairSession, LocalSessions.registerListener(session); // remove session at completion - session.addListener(new Runnable() - { - /** - * When repair finished, do clean up - */ - public void run() - { - sessions.remove(session.getId()); - LocalSessions.unregisterListener(session); - } - }, MoreExecutors.directExecutor()); + session.addListener(() -> { + sessions.remove(session.getId()); + LocalSessions.unregisterListener(session); + }); session.start(executor); return session; } diff --git a/src/java/org/apache/cassandra/utils/concurrent/AbstractFuture.java b/src/java/org/apache/cassandra/utils/concurrent/AbstractFuture.java index b8944f9feefb..86e3c12ea9c4 100644 --- a/src/java/org/apache/cassandra/utils/concurrent/AbstractFuture.java +++ b/src/java/org/apache/cassandra/utils/concurrent/AbstractFuture.java @@ -280,7 +280,14 @@ public AbstractFuture addCallback(FutureCallback callback) @Override public AbstractFuture addCallback(BiConsumer callback) { - appendListener(new CallbackBiConsumerListener<>(this, callback)); + appendListener(new CallbackBiConsumerListener<>(this, callback, null)); + return this; + } + + @Override + public Future addCallback(BiConsumer callback, Executor executor) + { + appendListener(new CallbackBiConsumerListener<>(this, callback, executor)); return this; } @@ -305,19 +312,42 @@ public AbstractFuture addCallback(FutureCallback callback, Executo @Override public AbstractFuture addCallback(Consumer onSuccess, Consumer onFailure) { - appendListener(new CallbackLambdaListener<>(this, onSuccess, onFailure)); + appendListener(new CallbackLambdaListener<>(this, onSuccess, onFailure, null)); return this; } /** - * Support {@link com.google.common.util.concurrent.Futures#transformAsync(ListenableFuture, AsyncFunction, Executor)} natively + * Support more fluid version of {@link com.google.common.util.concurrent.Futures#addCallback} * * See {@link #addListener(GenericFutureListener)} for ordering semantics. */ @Override - public Future andThenAsync(Function> andThen) + public AbstractFuture addCallback(Consumer onSuccess, Consumer onFailure, Executor executor) { - return andThenAsync(andThen, null); + appendListener(new CallbackLambdaListener<>(this, onSuccess, onFailure, executor)); + return this; + } + + /** + * Support {@link com.google.common.util.concurrent.Futures#transform(ListenableFuture, com.google.common.base.Function, Executor)} natively + * + * See {@link #addListener(GenericFutureListener)} for ordering semantics. + */ + protected Future map(AbstractFuture result, Function mapper, @Nullable Executor executor) + { + addListener(() -> { + try + { + if (isSuccess()) result.trySet(mapper.apply(getNow())); + else result.tryFailure(cause()); + } + catch (Throwable t) + { + result.tryFailure(t); + throw t; + } + }, executor); + return result; } /** @@ -325,12 +355,12 @@ public Future andThenAsync(Function> andTh * * See {@link #addListener(GenericFutureListener)} for ordering semantics. */ - protected Future andThenAsync(AbstractFuture result, Function> andThen, @Nullable Executor executor) + protected Future flatMap(AbstractFuture result, Function> flatMapper, @Nullable Executor executor) { addListener(() -> { try { - if (isSuccess()) andThen.apply(getNow()).addListener(propagate(result)); + if (isSuccess()) flatMapper.apply(getNow()).addListener(propagate(result)); else result.tryFailure(cause()); } catch (Throwable t) diff --git a/src/java/org/apache/cassandra/utils/concurrent/AsyncFuture.java b/src/java/org/apache/cassandra/utils/concurrent/AsyncFuture.java index a7b7a6ab0167..b09eeb714bdd 100644 --- a/src/java/org/apache/cassandra/utils/concurrent/AsyncFuture.java +++ b/src/java/org/apache/cassandra/utils/concurrent/AsyncFuture.java @@ -122,15 +122,26 @@ void appendListener(ListenerList newListener) ListenerList.notify(listenersUpdater, this); } + /** + * Support {@link com.google.common.util.concurrent.Futures#transform(ListenableFuture, com.google.common.base.Function, Executor)} natively + * + * See {@link #addListener(GenericFutureListener)} for ordering semantics. + */ + @Override + public Future map(Function mapper, Executor executor) + { + return map(new AsyncFuture<>(), mapper, executor); + } + /** * Support {@link com.google.common.util.concurrent.Futures#transformAsync(ListenableFuture, AsyncFunction, Executor)} natively * * See {@link #addListener(GenericFutureListener)} for ordering semantics. */ @Override - public Future andThenAsync(Function> andThen, @Nullable Executor executor) + public Future flatMap(Function> flatMapper, @Nullable Executor executor) { - return andThenAsync(new AsyncFuture<>(), andThen, executor); + return flatMap(new AsyncFuture<>(), flatMapper, executor); } /** diff --git a/src/java/org/apache/cassandra/utils/concurrent/Future.java b/src/java/org/apache/cassandra/utils/concurrent/Future.java index 22b15cc06956..69dc83d87fec 100644 --- a/src/java/org/apache/cassandra/utils/concurrent/Future.java +++ b/src/java/org/apache/cassandra/utils/concurrent/Future.java @@ -115,6 +115,11 @@ default boolean awaitUninterruptibly(long l) */ Future addCallback(BiConsumer callback); + /** + * Support {@link com.google.common.util.concurrent.Futures#addCallback} natively + */ + Future addCallback(BiConsumer callback, Executor executor); + /** * Support {@link com.google.common.util.concurrent.Futures#addCallback} natively */ @@ -130,15 +135,36 @@ default boolean awaitUninterruptibly(long l) */ Future addCallback(Consumer onSuccess, Consumer onFailure); + /** + * Support {@link com.google.common.util.concurrent.Futures#addCallback} natively + */ + Future addCallback(Consumer onSuccess, Consumer onFailure, Executor executor); + + /** + * Support {@link com.google.common.util.concurrent.Futures#transform(ListenableFuture, com.google.common.base.Function, Executor)} natively + */ + default Future map(Function mapper) + { + return map(mapper, null); + } + + /** + * Support {@link com.google.common.util.concurrent.Futures#transform(ListenableFuture, com.google.common.base.Function, Executor)} natively + */ + Future map(Function mapper, Executor executor); + /** * Support {@link com.google.common.util.concurrent.Futures#transformAsync(ListenableFuture, AsyncFunction, Executor)} natively */ - Future andThenAsync(Function> andThen); + default Future flatMap(Function> flatMapper) + { + return flatMap(flatMapper, null); + } /** * Support {@link com.google.common.util.concurrent.Futures#transformAsync(ListenableFuture, AsyncFunction, Executor)} natively */ - Future andThenAsync(Function> andThen, Executor executor); + Future flatMap(Function> flatMapper, Executor executor); /** * Invoke {@code runnable} on completion, using {@code executor}. diff --git a/src/java/org/apache/cassandra/utils/concurrent/ListenerList.java b/src/java/org/apache/cassandra/utils/concurrent/ListenerList.java index c150ea2a2f3f..57737eaf3809 100644 --- a/src/java/org/apache/cassandra/utils/concurrent/ListenerList.java +++ b/src/java/org/apache/cassandra/utils/concurrent/ListenerList.java @@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.BiConsumer; import java.util.function.Consumer; - import javax.annotation.Nullable; import com.google.common.util.concurrent.FutureCallback; @@ -151,7 +150,7 @@ static > void notifyListener(Generi static > void notifyListener(Executor notifyExecutor, GenericFutureListener listener, F future) { if (notifyExecutor == null) notifyListener(listener, future); - else notifyExecutor.execute(() -> notifyListener(listener, future)); + else safeExecute(notifyExecutor, () -> notifyListener(listener, future)); } /** @@ -159,8 +158,22 @@ static > void notifyListener(Execut */ static void notifyListener(@Nullable Executor notifyExecutor, Runnable listener) { - if (notifyExecutor == null) ImmediateExecutor.INSTANCE.execute(listener); - else notifyExecutor.execute(listener); + safeExecute(notifyExecutor, listener); + } + + private static void safeExecute(@Nullable Executor notifyExecutor, Runnable runnable) + { + if (notifyExecutor == null) + notifyExecutor = ImmediateExecutor.INSTANCE; + try + { + notifyExecutor.execute(runnable); + } + catch (Exception | Error e) + { + // TODO: suboptimal package interdependency - move FutureTask etc here? + ExecutionFailure.handle(e); + } } /** @@ -219,11 +232,13 @@ static class CallbackBiConsumerListener extends ListenerList implements Ru { final Future future; final BiConsumer callback; + final Executor executor; - CallbackBiConsumerListener(Future future, BiConsumer callback) + CallbackBiConsumerListener(Future future, BiConsumer callback, Executor executor) { this.future = future; this.callback = callback; + this.executor = executor; } @Override @@ -236,7 +251,7 @@ public void run() @Override void notifySelf(Executor notifyExecutor, Future future) { - notifyListener(notifyExecutor, this); + notifyListener(executor == null ? notifyExecutor : executor, this); } } @@ -269,12 +284,14 @@ static class CallbackLambdaListener extends ListenerList implements Runnab final Future future; final Consumer onSuccess; final Consumer onFailure; + final Executor executor; - CallbackLambdaListener(Future future, Consumer onSuccess, Consumer onFailure) + CallbackLambdaListener(Future future, Consumer onSuccess, Consumer onFailure, Executor executor) { this.future = future; this.onSuccess = onSuccess; this.onFailure = onFailure; + this.executor = executor; } @Override @@ -287,7 +304,7 @@ public void run() @Override void notifySelf(Executor notifyExecutor, Future future) { - notifyListener(notifyExecutor, this); + notifyListener(executor == null ? notifyExecutor : executor, this); } } diff --git a/src/java/org/apache/cassandra/utils/concurrent/SyncFuture.java b/src/java/org/apache/cassandra/utils/concurrent/SyncFuture.java index 963596976aef..43648c0893e8 100644 --- a/src/java/org/apache/cassandra/utils/concurrent/SyncFuture.java +++ b/src/java/org/apache/cassandra/utils/concurrent/SyncFuture.java @@ -89,15 +89,26 @@ protected SyncFuture(FailureHolder initialState, GenericFutureListener Future map(Function mapper, Executor executor) + { + return map(new SyncFuture<>(), mapper, executor); + } + /** * Support {@link com.google.common.util.concurrent.Futures#transformAsync(ListenableFuture, AsyncFunction, Executor)} natively * * See {@link #addListener(GenericFutureListener)} for ordering semantics. */ @Override - public Future andThenAsync(Function> andThen, @Nullable Executor executor) + public Future flatMap(Function> flatMapper, @Nullable Executor executor) { - return andThenAsync(new SyncFuture<>(), andThen, executor); + return flatMap(new SyncFuture<>(), flatMapper, executor); } /** diff --git a/test/distributed/org/apache/cassandra/distributed/test/AbstractNetstatsBootstrapStreaming.java b/test/distributed/org/apache/cassandra/distributed/test/AbstractNetstatsBootstrapStreaming.java index baada12e7465..0bf9b9fe6b4b 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/AbstractNetstatsBootstrapStreaming.java +++ b/test/distributed/org/apache/cassandra/distributed/test/AbstractNetstatsBootstrapStreaming.java @@ -77,8 +77,10 @@ protected void executeTest(final boolean streamEntireSSTables, final Future startupRunnable = executorService.submit((Runnable) secondNode::startup); final Future netstatsFuture = executorService.submit(new NetstatsCallable(cluster.get(1))); + startupRunnable.get(3, MINUTES); + // 1m is a bit much, but should be fine on slower environments. Node2 can't come up without streaming + // completing, so if node2 is up 1m is enough time for the nodetool watcher to yield final AbstractNetstatsStreaming.NetstatResults results = netstatsFuture.get(1, MINUTES); - startupRunnable.get(2, MINUTES); results.assertSuccessful(); diff --git a/test/distributed/org/apache/cassandra/distributed/test/repair/ForceRepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/repair/ForceRepairTest.java new file mode 100644 index 000000000000..479dac318338 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/repair/ForceRepairTest.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.distributed.test.repair; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.ArrayUtils; +import org.junit.Test; + +import com.carrotsearch.hppc.LongArrayList; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.lifecycle.View; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.api.QueryResults; +import org.apache.cassandra.distributed.api.SimpleQueryResult; +import org.apache.cassandra.distributed.shared.AssertUtils; +import org.apache.cassandra.distributed.shared.ClusterUtils; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.metadata.StatsMetadata; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableMetadata; +import org.assertj.core.api.Assertions; + +public class ForceRepairTest extends TestBaseImpl +{ + /** + * Port of python dtest "repair_tests/incremental_repair_test.py::TestIncRepair::test_force" but extends to test + * all types of repair. + */ + @Test + public void force() throws IOException + { + force(false); + } + + @Test + public void forceWithDifference() throws IOException + { + force(true); + } + + private void force(boolean includeDifference) throws IOException + { + long nowInMicro = System.currentTimeMillis() * 1000; + try (Cluster cluster = Cluster.build(3) + .withConfig(c -> c.set("hinted_handoff_enabled", false) + .with(Feature.values())) + .start()) + { + init(cluster); + cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (k INT PRIMARY KEY, v INT)")); + + for (int i = 0; i < 10; i++) + cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.tbl (k,v) VALUES (?, ?) USING TIMESTAMP ?"), ConsistencyLevel.ALL, i, i, nowInMicro++); + + ClusterUtils.stopUnchecked(cluster.get(2)); + + // repair should fail because node2 is down + IInvokableInstance node1 = cluster.get(1); + + for (String[] args : Arrays.asList(new String[]{ "--full" }, + new String[]{ "--full", "--preview" }, + new String[]{ "--full", "--validate"}, // nothing should be in the repaired set, so shouldn't stream + new String[]{ "--preview" }, // IR Preview + new String[]{ "--validate"}, // nothing should be in the repaired set, so shouldn't stream + new String[0])) // IR + { + if (includeDifference) + node1.executeInternal(withKeyspace("INSERT INTO %s.tbl (k,v) VALUES (?, ?) USING TIMESTAMP ?"), -1, -1, nowInMicro++); // each loop should have a different timestamp, causing a new difference + + try + { + node1.nodetoolResult(ArrayUtils.addAll(new String[] {"repair", KEYSPACE}, args)).asserts().failure(); + node1.nodetoolResult(ArrayUtils.addAll(new String[] {"repair", KEYSPACE, "--force"}, args)).asserts().success(); + + assertNoRepairedAt(cluster); + } + catch (Exception | Error e) + { + // tag the error to include which args broke + e.addSuppressed(new AssertionError("Failure for args: " + Arrays.toString(args))); + throw e; + } + } + + if (includeDifference) + { + SimpleQueryResult expected = QueryResults.builder() + .row(-1, -1) + .build(); + for (IInvokableInstance node : Arrays.asList(node1, cluster.get(3))) + { + SimpleQueryResult results = node.executeInternalWithResult(withKeyspace("SELECT * FROM %s.tbl WHERE k=?"), -1); + expected.reset(); + AssertUtils.assertRows(results, expected); + } + } + } + } + + private static void assertNoRepairedAt(Cluster cluster) + { + List repairedAt = getRepairedAt(cluster, KEYSPACE, "tbl"); + Assertions.assertThat(repairedAt).hasSize(cluster.size()); + for (int i = 0; i < repairedAt.size(); i++) + { + long[] array = repairedAt.get(i); + if (array == null) + { + // ignore downed nodes + Assertions.assertThat(cluster.get(i + 1).isShutdown()).isTrue(); + continue; + } + Assertions.assertThat(array).isNotEmpty(); + for (long a : array) + Assertions.assertThat(a).describedAs("node%d had a repaired sstable", i + 1).isEqualTo(0); + } + } + + private static List getRepairedAt(Cluster cluster, String keyspace, String table) + { + return cluster.stream().map(i -> { + if (i.isShutdown()) + return null; + + return i.callOnInstance(() -> { + TableMetadata meta = Schema.instance.getTableMetadata(keyspace, table); + ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(meta.id); + + View view = cfs.getTracker().getView(); + LongArrayList list = new LongArrayList(); + for (SSTableReader sstable : view.liveSSTables()) + { + try + { + StatsMetadata metadata = sstable.getSSTableMetadata(); + list.add(metadata.repairedAt); + } + catch (Exception e) + { + // ignore + } + } + return list.toArray(); + }); + }).collect(Collectors.toList()); + } +} diff --git a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorMessagingTest.java b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorMessagingTest.java index ef08022836b1..2bfd4dc8d658 100644 --- a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorMessagingTest.java +++ b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorMessagingTest.java @@ -20,7 +20,6 @@ import java.util.Collection; import java.util.Collections; -import java.util.List; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -31,6 +30,8 @@ import java.util.function.Supplier; import com.google.common.collect.Lists; + +import org.apache.cassandra.repair.CoordinatedRepairResult; import org.apache.cassandra.utils.concurrent.AsyncPromise; import org.apache.cassandra.utils.concurrent.Future; import org.apache.cassandra.utils.concurrent.Promise; @@ -106,29 +107,27 @@ public void testMockedMessagingHappyPath() throws InterruptedException, Executio UUID uuid = registerSession(cfs, true, true); CoordinatorSession coordinator = ActiveRepairService.instance.consistent.coordinated.registerSession(uuid, PARTICIPANTS, false); AtomicBoolean repairSubmitted = new AtomicBoolean(false); - Promise> repairFuture = AsyncPromise.uncancellable(); - Supplier>> sessionSupplier = () -> + Promise repairFuture = AsyncPromise.uncancellable(); + Supplier> sessionSupplier = () -> { repairSubmitted.set(true); return repairFuture; }; // coordinator sends prepare requests to create local session and perform anticompaction - AtomicBoolean hasFailures = new AtomicBoolean(false); Assert.assertFalse(repairSubmitted.get()); // execute repair and start prepare phase - Future sessionResult = coordinator.execute(sessionSupplier, hasFailures); + Future sessionResult = coordinator.execute(sessionSupplier); Assert.assertFalse(sessionResult.isDone()); - Assert.assertFalse(hasFailures.get()); + // prepare completed prepareLatch.countDown(); spyPrepare.interceptMessageOut(3).get(1, TimeUnit.SECONDS); Assert.assertFalse(sessionResult.isDone()); - Assert.assertFalse(hasFailures.get()); // set result from local repair session - repairFuture.trySuccess(Lists.newArrayList(createResult(coordinator), createResult(coordinator), createResult(coordinator))); + repairFuture.trySuccess(CoordinatedRepairResult.success(Lists.newArrayList(createResult(coordinator), createResult(coordinator), createResult(coordinator)))); // finalize phase finalizeLatch.countDown(); @@ -136,8 +135,7 @@ public void testMockedMessagingHappyPath() throws InterruptedException, Executio // commit phase spyCommit.interceptMessageOut(3).get(1, TimeUnit.SECONDS); - Assert.assertTrue(sessionResult.get()); - Assert.assertFalse(hasFailures.get()); + Assert.assertFalse(sessionResult.get().hasFailed()); // expect no other messages except from intercepted so far spyPrepare.interceptNoMsg(100, TimeUnit.MILLISECONDS); @@ -197,19 +195,18 @@ private void testMockedMessagingPrepareFailure(CountDownLatch prepareLatch) thro UUID uuid = registerSession(cfs, true, true); CoordinatorSession coordinator = ActiveRepairService.instance.consistent.coordinated.registerSession(uuid, PARTICIPANTS, false); AtomicBoolean repairSubmitted = new AtomicBoolean(false); - Promise> repairFuture = AsyncPromise.uncancellable(); - Supplier>> sessionSupplier = () -> + Promise repairFuture = AsyncPromise.uncancellable(); + Supplier> sessionSupplier = () -> { repairSubmitted.set(true); return repairFuture; }; // coordinator sends prepare requests to create local session and perform anticompaction - AtomicBoolean proposeFailed = new AtomicBoolean(false); Assert.assertFalse(repairSubmitted.get()); // execute repair and start prepare phase - Future sessionResult = coordinator.execute(sessionSupplier, proposeFailed); + Future sessionResult = coordinator.execute(sessionSupplier); prepareLatch.countDown(); // prepare completed try @@ -222,7 +219,8 @@ private void testMockedMessagingPrepareFailure(CountDownLatch prepareLatch) thro } sendFailSessionExpectedSpy.interceptMessageOut(3).get(1, TimeUnit.SECONDS); Assert.assertFalse(repairSubmitted.get()); - Assert.assertTrue(proposeFailed.get()); + Assert.assertTrue(sessionResult.isDone()); + Assert.assertNotNull(sessionResult.cause()); Assert.assertEquals(ConsistentSession.State.FAILED, coordinator.getState()); Assert.assertFalse(ActiveRepairService.instance.consistent.local.isSessionInProgress(uuid)); } @@ -236,19 +234,18 @@ public void testMockedMessagingPrepareTimeout() throws InterruptedException, Exe UUID uuid = registerSession(cfs, true, true); CoordinatorSession coordinator = ActiveRepairService.instance.consistent.coordinated.registerSession(uuid, PARTICIPANTS, false); AtomicBoolean repairSubmitted = new AtomicBoolean(false); - Promise> repairFuture = AsyncPromise.uncancellable(); - Supplier>> sessionSupplier = () -> + Promise repairFuture = AsyncPromise.uncancellable(); + Supplier> sessionSupplier = () -> { repairSubmitted.set(true); return repairFuture; }; // coordinator sends prepare requests to create local session and perform anticompaction - AtomicBoolean hasFailures = new AtomicBoolean(false); Assert.assertFalse(repairSubmitted.get()); // execute repair and start prepare phase - Future sessionResult = coordinator.execute(sessionSupplier, hasFailures); + Future sessionResult = coordinator.execute(sessionSupplier); try { sessionResult.get(1, TimeUnit.SECONDS); @@ -266,7 +263,6 @@ public void testMockedMessagingPrepareTimeout() throws InterruptedException, Exe spyPrepare.expectMockedMessage(2).get(100, TimeUnit.MILLISECONDS); sendFailSessionUnexpectedSpy.interceptNoMsg(100, TimeUnit.MILLISECONDS); Assert.assertFalse(repairSubmitted.get()); - Assert.assertFalse(hasFailures.get()); Assert.assertEquals(ConsistentSession.State.PREPARING, coordinator.getState()); Assert.assertFalse(ActiveRepairService.instance.consistent.local.isSessionInProgress(uuid)); } diff --git a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java index 42453d44a5d8..0421ad392ad8 100644 --- a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java +++ b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java @@ -19,6 +19,8 @@ package org.apache.cassandra.repair.consistent; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -28,16 +30,15 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; - -import org.apache.cassandra.utils.concurrent.AsyncPromise; -import org.apache.cassandra.utils.concurrent.Future; -import org.apache.cassandra.utils.concurrent.Promise; import org.junit.Assert; import org.junit.Test; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.Message; import org.apache.cassandra.repair.AbstractRepairTest; +import org.apache.cassandra.repair.CoordinatedRepairResult; import org.apache.cassandra.repair.RepairSessionResult; import org.apache.cassandra.repair.messages.FailSession; import org.apache.cassandra.repair.messages.FinalizeCommit; @@ -45,8 +46,15 @@ import org.apache.cassandra.repair.messages.PrepareConsistentRequest; import org.apache.cassandra.repair.messages.RepairMessage; import org.apache.cassandra.utils.UUIDGen; +import org.apache.cassandra.utils.concurrent.AsyncPromise; +import org.apache.cassandra.utils.concurrent.Future; +import org.apache.cassandra.utils.concurrent.Promise; -import static org.apache.cassandra.repair.consistent.ConsistentSession.State.*; +import static org.apache.cassandra.repair.consistent.ConsistentSession.State.FAILED; +import static org.apache.cassandra.repair.consistent.ConsistentSession.State.FINALIZE_PROMISED; +import static org.apache.cassandra.repair.consistent.ConsistentSession.State.PREPARED; +import static org.apache.cassandra.repair.consistent.ConsistentSession.State.PREPARING; +import static org.apache.cassandra.repair.consistent.ConsistentSession.State.REPAIRING; public class CoordinatorSessionTest extends AbstractRepairTest { @@ -210,18 +218,17 @@ public void successCase() { InstrumentedCoordinatorSession coordinator = createInstrumentedSession(); AtomicBoolean repairSubmitted = new AtomicBoolean(false); - Promise> repairFuture = AsyncPromise.uncancellable(); - Supplier>> sessionSupplier = () -> + Promise repairFuture = AsyncPromise.uncancellable(); + Supplier> sessionSupplier = () -> { repairSubmitted.set(true); return repairFuture; }; // coordinator sends prepare requests to create local session and perform anticompaction - AtomicBoolean hasFailures = new AtomicBoolean(false); Assert.assertFalse(repairSubmitted.get()); Assert.assertTrue(coordinator.sentMessages.isEmpty()); - Future sessionResult = coordinator.execute(sessionSupplier, hasFailures); + Future sessionResult = coordinator.execute(sessionSupplier); for (InetAddressAndPort participant : PARTICIPANTS) { @@ -253,7 +260,7 @@ public void successCase() createResult(coordinator)); coordinator.sentMessages.clear(); - repairFuture.trySuccess(results); + repairFuture.trySuccess(CoordinatedRepairResult.success(results)); // propose messages should have been sent once all repair sessions completed successfully for (InetAddressAndPort participant : PARTICIPANTS) @@ -286,7 +293,7 @@ public void successCase() } Assert.assertTrue(sessionResult.isDone()); - Assert.assertFalse(hasFailures.get()); + sessionResult.syncUninterruptibly(); } @Test @@ -294,18 +301,17 @@ public void failedRepairs() { InstrumentedCoordinatorSession coordinator = createInstrumentedSession(); AtomicBoolean repairSubmitted = new AtomicBoolean(false); - Promise> repairFuture = AsyncPromise.uncancellable(); - Supplier>> sessionSupplier = () -> + Promise repairFuture = AsyncPromise.uncancellable(); + Supplier> sessionSupplier = () -> { repairSubmitted.set(true); return repairFuture; }; // coordinator sends prepare requests to create local session and perform anticompaction - AtomicBoolean hasFailures = new AtomicBoolean(false); Assert.assertFalse(repairSubmitted.get()); Assert.assertTrue(coordinator.sentMessages.isEmpty()); - Future sessionResult = coordinator.execute(sessionSupplier, hasFailures); + Future sessionResult = coordinator.execute(sessionSupplier); for (InetAddressAndPort participant : PARTICIPANTS) { PrepareConsistentRequest expected = new PrepareConsistentRequest(coordinator.sessionID, COORDINATOR, new HashSet<>(PARTICIPANTS)); @@ -330,6 +336,7 @@ public void failedRepairs() Assert.assertEquals(ConsistentSession.State.REPAIRING, coordinator.getState()); + List>> ranges = Arrays.asList(coordinator.ranges, coordinator.ranges, coordinator.ranges); ArrayList results = Lists.newArrayList(createResult(coordinator), null, createResult(coordinator)); @@ -337,7 +344,7 @@ public void failedRepairs() coordinator.sentMessages.clear(); Assert.assertFalse(coordinator.failCalled); coordinator.onFail = () -> Assert.assertEquals(REPAIRING, coordinator.getState()); - repairFuture.trySuccess(results); + repairFuture.trySuccess(CoordinatedRepairResult.create(ranges, results)); Assert.assertTrue(coordinator.failCalled); // all participants should have been notified of session failure @@ -348,7 +355,7 @@ public void failedRepairs() } Assert.assertTrue(sessionResult.isDone()); - Assert.assertTrue(hasFailures.get()); + Assert.assertNotNull(sessionResult.cause()); } @Test @@ -356,18 +363,17 @@ public void failedPrepare() { InstrumentedCoordinatorSession coordinator = createInstrumentedSession(); AtomicBoolean repairSubmitted = new AtomicBoolean(false); - Promise> repairFuture = AsyncPromise.uncancellable(); - Supplier>> sessionSupplier = () -> + Promise repairFuture = AsyncPromise.uncancellable(); + Supplier> sessionSupplier = () -> { repairSubmitted.set(true); return repairFuture; }; // coordinator sends prepare requests to create local session and perform anticompaction - AtomicBoolean hasFailures = new AtomicBoolean(false); Assert.assertFalse(repairSubmitted.get()); Assert.assertTrue(coordinator.sentMessages.isEmpty()); - Future sessionResult = coordinator.execute(sessionSupplier, hasFailures); + Future sessionResult = coordinator.execute(sessionSupplier); for (InetAddressAndPort participant : PARTICIPANTS) { PrepareConsistentRequest expected = new PrepareConsistentRequest(coordinator.sessionID, COORDINATOR, new HashSet<>(PARTICIPANTS)); @@ -414,7 +420,7 @@ public void failedPrepare() Assert.assertFalse(coordinator.sentMessages.containsKey(PARTICIPANT2)); Assert.assertTrue(sessionResult.isDone()); - Assert.assertTrue(hasFailures.get()); + Assert.assertNotNull(sessionResult.cause()); } @Test @@ -422,18 +428,17 @@ public void failedPropose() { InstrumentedCoordinatorSession coordinator = createInstrumentedSession(); AtomicBoolean repairSubmitted = new AtomicBoolean(false); - Promise> repairFuture = AsyncPromise.uncancellable(); - Supplier>> sessionSupplier = () -> + Promise repairFuture = AsyncPromise.uncancellable(); + Supplier> sessionSupplier = () -> { repairSubmitted.set(true); return repairFuture; }; // coordinator sends prepare requests to create local session and perform anticompaction - AtomicBoolean hasFailures = new AtomicBoolean(false); Assert.assertFalse(repairSubmitted.get()); Assert.assertTrue(coordinator.sentMessages.isEmpty()); - Future sessionResult = coordinator.execute(sessionSupplier, hasFailures); + Future sessionResult = coordinator.execute(sessionSupplier); for (InetAddressAndPort participant : PARTICIPANTS) { @@ -465,7 +470,7 @@ public void failedPropose() createResult(coordinator)); coordinator.sentMessages.clear(); - repairFuture.trySuccess(results); + repairFuture.trySuccess(CoordinatedRepairResult.success(results)); // propose messages should have been sent once all repair sessions completed successfully for (InetAddressAndPort participant : PARTICIPANTS) @@ -501,6 +506,6 @@ public void failedPropose() } Assert.assertTrue(sessionResult.isDone()); - Assert.assertTrue(hasFailures.get()); + Assert.assertNotNull(sessionResult.cause()); } } diff --git a/test/unit/org/apache/cassandra/utils/concurrent/AbstractTestAsyncPromise.java b/test/unit/org/apache/cassandra/utils/concurrent/AbstractTestAsyncPromise.java index e1c069b93186..0c297ecd6637 100644 --- a/test/unit/org/apache/cassandra/utils/concurrent/AbstractTestAsyncPromise.java +++ b/test/unit/org/apache/cassandra/utils/concurrent/AbstractTestAsyncPromise.java @@ -35,9 +35,15 @@ import org.junit.Assert; import io.netty.util.concurrent.GenericFutureListener; +import org.apache.cassandra.config.DatabaseDescriptor; public abstract class AbstractTestAsyncPromise extends AbstractTestPromise { + static + { + DatabaseDescriptor.clientInitialization(); + } + public static Promise cancelSuccess(Promise promise) { success(promise, Promise::isCancellable, true); @@ -117,20 +123,30 @@ public Function> getAsyncFunction() int id = count++; return result -> { results.add(result); order.add(id); return ImmediateFuture.success(result); }; } + public Function getFunction() + { + int id = count++; + return result -> { results.add(result); order.add(id); return result; }; + } public Function> getRecursiveAsyncFunction(Promise promise) { int id = count++; - return result -> { promise.andThenAsync(getAsyncFunction()); results.add(result); order.add(id); return ImmediateFuture.success(result); }; + return result -> { promise.flatMap(getAsyncFunction()); results.add(result); order.add(id); return ImmediateFuture.success(result); }; } public Function> getAsyncFailingFunction() { int id = count++; return result -> { results.add(result); order.add(id); return ImmediateFuture.failure(new RuntimeException()); }; } + public Function getFailingFunction() + { + int id = count++; + return result -> { results.add(result); order.add(id); throw new RuntimeException(); }; + } public Function> getRecursiveAsyncFailingFunction(Promise promise) { int id = count++; - return result -> { promise.andThenAsync(getAsyncFailingFunction()); results.add(result); order.add(id); return ImmediateFuture.failure(new RuntimeException()); }; + return result -> { promise.flatMap(getAsyncFailingFunction()); results.add(result); order.add(id); return ImmediateFuture.failure(new RuntimeException()); }; } public FutureCallback getCallback(Future p) { @@ -181,17 +197,26 @@ public void onFailure(Throwable throwable) promise.addListener(listeners.getRecursiveRunnable(promise), MoreExecutors.directExecutor()); promise.addListener(listeners.getRecursive()); promise.addCallback(listeners.getCallback(promise)); + promise.addCallback(listeners.getCallback(promise), MoreExecutors.directExecutor()); promise.addCallback(listeners.getRecursiveCallback(promise)); + promise.addCallback(listeners.getRecursiveCallback(promise), MoreExecutors.directExecutor()); promise.addCallback(listeners.getConsumer(), fail -> Assert.fail()); + promise.addCallback(listeners.getConsumer(), fail -> Assert.fail(), MoreExecutors.directExecutor()); promise.addCallback(listeners.getRecursiveConsumer(), fail -> Assert.fail()); - promise.andThenAsync(listeners.getAsyncFunction()).addListener(listeners.get()); - promise.andThenAsync(listeners.getAsyncFunction(), MoreExecutors.directExecutor()).addListener(listeners.get()); - promise.andThenAsync(listeners.getRecursiveAsyncFunction(promise)).addListener(listeners.get()); - promise.andThenAsync(listeners.getRecursiveAsyncFunction(promise), MoreExecutors.directExecutor()).addListener(listeners.get()); - promise.andThenAsync(listeners.getAsyncFailingFunction()).addListener(listeners.getListenerToFailure(promise)); - promise.andThenAsync(listeners.getAsyncFailingFunction(), MoreExecutors.directExecutor()).addListener(listeners.getListenerToFailure(promise)); - promise.andThenAsync(listeners.getRecursiveAsyncFailingFunction(promise)).addListener(listeners.getListenerToFailure(promise)); - promise.andThenAsync(listeners.getRecursiveAsyncFailingFunction(promise), MoreExecutors.directExecutor()).addListener(listeners.getListenerToFailure(promise)); + promise.addCallback(listeners.getRecursiveConsumer(), fail -> Assert.fail(), MoreExecutors.directExecutor()); + promise.map(listeners.getFunction()).addListener(listeners.get()); + promise.map(listeners.getFunction(), MoreExecutors.directExecutor()).addListener(listeners.get()); + promise.map(listeners.getFailingFunction()).addListener(listeners.getListenerToFailure(promise)); + promise.map(listeners.getFailingFunction(), MoreExecutors.directExecutor()).addListener(listeners.getListenerToFailure(promise)); + promise.flatMap(listeners.getAsyncFunction()).addListener(listeners.get()); + promise.flatMap(listeners.getAsyncFunction(), MoreExecutors.directExecutor()).addListener(listeners.get()); + promise.flatMap(listeners.getRecursiveAsyncFunction(promise)).addListener(listeners.get()); + promise.flatMap(listeners.getRecursiveAsyncFunction(promise), MoreExecutors.directExecutor()).addListener(listeners.get()); + promise.flatMap(listeners.getAsyncFailingFunction()).addListener(listeners.getListenerToFailure(promise)); + promise.flatMap(listeners.getAsyncFailingFunction(), MoreExecutors.directExecutor()).addListener(listeners.getListenerToFailure(promise)); + promise.flatMap(listeners.getRecursiveAsyncFailingFunction(promise)).addListener(listeners.getListenerToFailure(promise)); + promise.flatMap(listeners.getRecursiveAsyncFailingFunction(promise), MoreExecutors.directExecutor()).addListener(listeners.getListenerToFailure(promise)); + success(promise, Promise::getNow, null); success(promise, Promise::isSuccess, false); success(promise, Promise::isDone, false); @@ -224,17 +249,25 @@ public void onFailure(Throwable throwable) promise.addListener(listeners.getRecursiveRunnable(promise), MoreExecutors.directExecutor()); promise.addListener(listeners.getRecursive()); promise.addCallback(listeners.getCallback(promise)); + promise.addCallback(listeners.getCallback(promise), MoreExecutors.directExecutor()); promise.addCallback(listeners.getRecursiveCallback(promise)); + promise.addCallback(listeners.getRecursiveCallback(promise), MoreExecutors.directExecutor()); promise.addCallback(listeners.getConsumer(), fail -> Assert.fail()); + promise.addCallback(listeners.getConsumer(), fail -> Assert.fail(), MoreExecutors.directExecutor()); promise.addCallback(listeners.getRecursiveConsumer(), fail -> Assert.fail()); - promise.andThenAsync(listeners.getAsyncFunction()).addListener(listeners.get()); - promise.andThenAsync(listeners.getAsyncFunction(), MoreExecutors.directExecutor()).addListener(listeners.get()); - promise.andThenAsync(listeners.getRecursiveAsyncFunction(promise)).addListener(listeners.get()); - promise.andThenAsync(listeners.getRecursiveAsyncFunction(promise), MoreExecutors.directExecutor()).addListener(listeners.get()); - promise.andThenAsync(listeners.getAsyncFailingFunction()).addListener(listeners.getListenerToFailure(promise)); - promise.andThenAsync(listeners.getAsyncFailingFunction(), MoreExecutors.directExecutor()).addListener(listeners.getListenerToFailure(promise)); - promise.andThenAsync(listeners.getRecursiveAsyncFailingFunction(promise)).addListener(listeners.getListenerToFailure(promise)); - promise.andThenAsync(listeners.getRecursiveAsyncFailingFunction(promise), MoreExecutors.directExecutor()).addListener(listeners.getListenerToFailure(promise)); + promise.addCallback(listeners.getRecursiveConsumer(), fail -> Assert.fail(), MoreExecutors.directExecutor()); + promise.map(listeners.getFunction()).addListener(listeners.get()); + promise.map(listeners.getFunction(), MoreExecutors.directExecutor()).addListener(listeners.get()); + promise.map(listeners.getFailingFunction()).addListener(listeners.getListenerToFailure(promise)); + promise.map(listeners.getFailingFunction(), MoreExecutors.directExecutor()).addListener(listeners.getListenerToFailure(promise)); + promise.flatMap(listeners.getAsyncFunction()).addListener(listeners.get()); + promise.flatMap(listeners.getAsyncFunction(), MoreExecutors.directExecutor()).addListener(listeners.get()); + promise.flatMap(listeners.getRecursiveAsyncFunction(promise)).addListener(listeners.get()); + promise.flatMap(listeners.getRecursiveAsyncFunction(promise), MoreExecutors.directExecutor()).addListener(listeners.get()); + promise.flatMap(listeners.getAsyncFailingFunction()).addListener(listeners.getListenerToFailure(promise)); + promise.flatMap(listeners.getAsyncFailingFunction(), MoreExecutors.directExecutor()).addListener(listeners.getListenerToFailure(promise)); + promise.flatMap(listeners.getRecursiveAsyncFailingFunction(promise)).addListener(listeners.getListenerToFailure(promise)); + promise.flatMap(listeners.getRecursiveAsyncFailingFunction(promise), MoreExecutors.directExecutor()).addListener(listeners.getListenerToFailure(promise)); success(promise, Promise::isSuccess, true); success(promise, Promise::isDone, true); success(promise, Promise::isCancelled, false); @@ -292,7 +325,7 @@ public Function> getAsyncFunction() } public Function> getRecursiveAsyncFunction() { - return result -> { promise.andThenAsync(getAsyncFunction()); return ImmediateFuture.success(result); }; + return result -> { promise.flatMap(getAsyncFunction()); return ImmediateFuture.success(result); }; } public FutureCallback getCallback(Future p) { @@ -346,10 +379,10 @@ public void onFailure(Throwable throwable) promise.addCallback(listeners.getRecursiveCallback(promise)); promise.addCallback(fail -> Assert.fail(), listeners.getConsumer()); promise.addCallback(fail -> Assert.fail(), listeners.getRecursiveConsumer()); - promise.andThenAsync(listeners.getAsyncFunction()).addListener(listeners.get()); - promise.andThenAsync(listeners.getAsyncFunction(), MoreExecutors.directExecutor()).addListener(listeners.get()); - promise.andThenAsync(listeners.getRecursiveAsyncFunction()).addListener(listeners.get()); - promise.andThenAsync(listeners.getRecursiveAsyncFunction(), MoreExecutors.directExecutor()).addListener(listeners.get()); + promise.flatMap(listeners.getAsyncFunction()).addListener(listeners.get()); + promise.flatMap(listeners.getAsyncFunction(), MoreExecutors.directExecutor()).addListener(listeners.get()); + promise.flatMap(listeners.getRecursiveAsyncFunction()).addListener(listeners.get()); + promise.flatMap(listeners.getRecursiveAsyncFunction(), MoreExecutors.directExecutor()).addListener(listeners.get()); success(promise, Promise::isSuccess, false); success(promise, Promise::isDone, false); success(promise, Promise::isCancelled, false); @@ -393,8 +426,8 @@ public void onFailure(Throwable throwable) promise.addCallback(listeners.getRecursiveCallback(promise)); promise.addCallback(fail -> Assert.fail(), listeners.getConsumer()); promise.addCallback(fail -> Assert.fail(), listeners.getRecursiveConsumer()); - promise.andThenAsync(listeners.getAsyncFunction()).addListener(listeners.get()); - promise.andThenAsync(listeners.getAsyncFunction(), MoreExecutors.directExecutor()).addListener(listeners.get()); + promise.flatMap(listeners.getAsyncFunction()).addListener(listeners.get()); + promise.flatMap(listeners.getAsyncFunction(), MoreExecutors.directExecutor()).addListener(listeners.get()); success(promise, Promise::isSuccess, false); success(promise, Promise::isDone, true); success(promise, Promise::isCancelled, false);