Navigation Menu

Skip to content

Commit

Permalink
Refactor normal/preview/IR repair to standardize repair cleanup and e…
Browse files Browse the repository at this point in the history
…rror handling of failed RepairJobs

patch by David Capwell; reviewed by Marcus Eriksson for CASSANDRA-17069
  • Loading branch information
dcapwell committed Nov 13, 2021
1 parent 092bb60 commit b84ec51
Show file tree
Hide file tree
Showing 35 changed files with 1,124 additions and 556 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
@@ -1,4 +1,5 @@
4.1
* Refactor normal/preview/IR repair to standardize repair cleanup and error handling of failed RepairJobs (CASSANDRA-17069)
* Log missing peers in StartupClusterConnectivityChecker (CASSANDRA-17130)
* Introduce separate rate limiting settings for entire SSTable streaming (CASSANDRA-17065)
* Implement Virtual Tables for Auth Caches (CASSANDRA-16914)
Expand Down
22 changes: 19 additions & 3 deletions checkstyle.xml
Expand Up @@ -51,14 +51,30 @@

<module name="SuppressWithNearbyCommentFilter">
<property name="commentFormat" value="checkstyle: permit system clock"/>
<property name="checkFormat" value="RegexpSinglelineJava"/>
<property name="idFormat" value="blockSystemClock"/>
<property name="influenceFormat" value="0"/>
</module>

<module name="RegexpSinglelineJava">
<!-- To prevent static imports and System.nanoTime or System.currentTimeMillis -->
<property name="format" value="(newSingleThreadExecutor|newFixedThreadPool|newCachedThreadPool|newSingleThreadScheduledExecutor|newWorkStealingPool|newScheduledThreadPool|defaultThreadFactory)\(|System\.(currentTimeMillis|nanoTime)"/>
<!-- block system time -->
<property name="id" value="blockSystemClock"/>
<property name="format" value="System\.(currentTimeMillis|nanoTime)"/>
<property name="ignoreComments" value="true"/>
<property name="message" value="Avoid System for time, should use org.apache.cassandra.utils.Clock.Global or org.apache.cassandra.utils.Clock interface" />
</module>
<module name="RegexpSinglelineJava">
<!-- block normal executors -->
<property name="id" value="blockExecutors"/>
<property name="format" value="newSingleThreadExecutor|newFixedThreadPool|newCachedThreadPool|newSingleThreadScheduledExecutor|newWorkStealingPool|newScheduledThreadPool|defaultThreadFactory"/>
<property name="ignoreComments" value="true"/>
<property name="message" value="Avoid creating an executor directly, should use org.apache.cassandra.concurrent.ExecutorFactory.Global#executorFactory" />
</module>
<module name="RegexpSinglelineJava">
<!-- block guavas directExecutor -->
<property name="id" value="blockGuavaDirectExecutor"/>
<property name="format" value="MoreExecutors\.directExecutor"/>
<property name="ignoreComments" value="true"/>
<property name="message" value="Avoid MoreExecutors.directExecutor() in favor of ImmediateExecutor.INSTANCE" />
</module>
<module name="IllegalImport">
<property name="illegalPkgs" value=""/>
Expand Down
3 changes: 2 additions & 1 deletion src/java/org/apache/cassandra/cache/CaffeineCache.java
Expand Up @@ -27,6 +27,7 @@
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Policy.Eviction;
import com.github.benmanes.caffeine.cache.Weigher;
import org.apache.cassandra.concurrent.ImmediateExecutor;

/**
* An adapter from a Caffeine cache to the ICache interface. This provides an on-heap cache using
Expand Down Expand Up @@ -54,7 +55,7 @@ public static <K extends IMeasurableMemory, V extends IMeasurableMemory> Caffein
Cache<K, V> cache = Caffeine.newBuilder()
.maximumWeight(weightedCapacity)
.weigher(weigher)
.executor(MoreExecutors.directExecutor())
.executor(ImmediateExecutor.INSTANCE)
.build();
return new CaffeineCache<>(cache);
}
Expand Down
3 changes: 2 additions & 1 deletion src/java/org/apache/cassandra/cache/ChunkCache.java
Expand Up @@ -29,6 +29,7 @@
import com.google.common.util.concurrent.MoreExecutors;

import com.github.benmanes.caffeine.cache.*;
import org.apache.cassandra.concurrent.ImmediateExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.util.*;
Expand Down Expand Up @@ -143,7 +144,7 @@ private ChunkCache(BufferPool pool)
metrics = new ChunkCacheMetrics(this);
cache = Caffeine.newBuilder()
.maximumWeight(cacheSize)
.executor(MoreExecutors.directExecutor())
.executor(ImmediateExecutor.INSTANCE)
.weigher((key, buffer) -> ((Buffer) buffer).buffer.capacity())
.removalListener(this)
.recordStats(() -> metrics)
Expand Down
3 changes: 2 additions & 1 deletion src/java/org/apache/cassandra/cache/SerializingCache.java
Expand Up @@ -21,6 +21,7 @@
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Weigher;

import org.apache.cassandra.concurrent.ImmediateExecutor;
import org.apache.cassandra.io.ISerializer;
import org.apache.cassandra.io.util.MemoryInputStream;
import org.apache.cassandra.io.util.MemoryOutputStream;
Expand Down Expand Up @@ -51,7 +52,7 @@ private SerializingCache(long capacity, Weigher<K, RefCountedMemory> weigher, IS
this.cache = Caffeine.newBuilder()
.weigher(weigher)
.maximumWeight(capacity)
.executor(MoreExecutors.directExecutor())
.executor(ImmediateExecutor.INSTANCE)
.removalListener((key, mem, cause) -> {
if (cause.wasEvicted()) {
mem.unreference();
Expand Down
3 changes: 2 additions & 1 deletion src/java/org/apache/cassandra/cql3/QueryProcessor.java
Expand Up @@ -35,6 +35,7 @@
import org.slf4j.LoggerFactory;

import org.antlr.runtime.*;
import org.apache.cassandra.concurrent.ImmediateExecutor;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.metrics.ClientRequestMetrics;
Expand Down Expand Up @@ -97,7 +98,7 @@ public class QueryProcessor implements QueryHandler
static
{
preparedStatements = Caffeine.newBuilder()
.executor(MoreExecutors.directExecutor())
.executor(ImmediateExecutor.INSTANCE)
.maximumWeight(capacityToBytes(DatabaseDescriptor.getPreparedStatementsCacheSizeMB()))
.weigher(QueryProcessor::measure)
.removalListener((key, prepared, cause) -> {
Expand Down
Expand Up @@ -372,7 +372,7 @@ public Future<List<Void>> run()
}

Future<List<AcquireResult>> acquisitionResults = FutureCombiner.successfulOf(tasks);
return acquisitionResults.andThenAsync(getAcquisitionCallback(prsId, tokenRanges));
return acquisitionResults.flatMap(getAcquisitionCallback(prsId, tokenRanges));
}

@VisibleForTesting
Expand Down
Expand Up @@ -24,6 +24,7 @@

import com.google.common.util.concurrent.MoreExecutors;

import org.apache.cassandra.concurrent.ImmediateExecutor;
import org.tartarus.snowball.SnowballStemmer;
import org.tartarus.snowball.ext.*;

Expand All @@ -42,7 +43,7 @@ public class StemmerFactory
{
private static final Logger logger = LoggerFactory.getLogger(StemmerFactory.class);
private static final LoadingCache<Class, Constructor<?>> STEMMER_CONSTRUCTOR_CACHE = Caffeine.newBuilder()
.executor(MoreExecutors.directExecutor())
.executor(ImmediateExecutor.INSTANCE)
.build(new CacheLoader<Class, Constructor<?>>()
{
public Constructor<?> load(Class aClass) throws Exception
Expand Down
Expand Up @@ -32,6 +32,7 @@
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;

import org.apache.cassandra.concurrent.ImmediateExecutor;
import org.apache.cassandra.io.util.File;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -51,7 +52,7 @@ public class StopWordFactory
"pl","pt","ro","ru","sv"));

private static final LoadingCache<String, Set<String>> STOP_WORDS_CACHE = Caffeine.newBuilder()
.executor(MoreExecutors.directExecutor())
.executor(ImmediateExecutor.INSTANCE)
.build(StopWordFactory::getStopWordsFromResource);

public static Set<String> getStopWordsForLanguage(Locale locale)
Expand Down
Expand Up @@ -25,6 +25,7 @@
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;

import org.apache.cassandra.concurrent.ImmediateExecutor;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.utils.UUIDGen;
Expand All @@ -44,12 +45,12 @@ public class HintedHandoffMetrics

/** Total number of hints which are not stored, This is not a cache. */
private final LoadingCache<InetAddressAndPort, DifferencingCounter> notStored = Caffeine.newBuilder()
.executor(MoreExecutors.directExecutor())
.executor(ImmediateExecutor.INSTANCE)
.build(DifferencingCounter::new);

/** Total number of hints that have been created, This is not a cache. */
private final LoadingCache<InetAddressAndPort, Counter> createdHintCounts = Caffeine.newBuilder()
.executor(MoreExecutors.directExecutor())
.executor(ImmediateExecutor.INSTANCE)
.build(address -> Metrics.counter(factory.createMetricName("Hints_created-" + address.toString().replace(':', '.'))));

public void incrCreatedHints(InetAddressAndPort address)
Expand Down
Expand Up @@ -25,6 +25,7 @@
import com.codahale.metrics.Meter;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import org.apache.cassandra.concurrent.ImmediateExecutor;
import org.apache.cassandra.locator.InetAddressAndPort;

import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
Expand All @@ -47,7 +48,7 @@ public final class HintsServiceMetrics

/** Histograms per-endpoint of hint delivery delays, This is not a cache. */
private static final LoadingCache<InetAddressAndPort, Histogram> delayByEndpoint = Caffeine.newBuilder()
.executor(MoreExecutors.directExecutor())
.executor(ImmediateExecutor.INSTANCE)
.build(address -> Metrics.histogram(factory.createMetricName("Hint_delays-"+address.toString().replace(':', '.')), false));

public static void updateDelayMetrics(InetAddressAndPort endpoint, long delay)
Expand Down
122 changes: 122 additions & 0 deletions src/java/org/apache/cassandra/repair/AbstractRepairTask.java
@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.repair;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.UUID;

import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.concurrent.ExecutorPlus;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.repair.messages.RepairOption;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.concurrent.Future;
import org.apache.cassandra.utils.concurrent.FutureCombiner;

public abstract class AbstractRepairTask implements RepairTask
{
protected static final Logger logger = LoggerFactory.getLogger(AbstractRepairTask.class);

protected final RepairOption options;
protected final String keyspace;
protected final RepairNotifier notifier;

protected AbstractRepairTask(RepairOption options, String keyspace, RepairNotifier notifier)
{
this.options = Objects.requireNonNull(options);
this.keyspace = Objects.requireNonNull(keyspace);
this.notifier = Objects.requireNonNull(notifier);
}

private List<RepairSession> submitRepairSessions(UUID parentSession,
boolean isIncremental,
ExecutorPlus executor,
List<CommonRange> commonRanges,
String... cfnames)
{
List<RepairSession> futures = new ArrayList<>(options.getRanges().size());

for (CommonRange commonRange : commonRanges)
{
logger.info("Starting RepairSession for {}", commonRange);
RepairSession session = ActiveRepairService.instance.submitRepairSession(parentSession,
commonRange,
keyspace,
options.getParallelism(),
isIncremental,
options.isPullRepair(),
options.getPreviewKind(),
options.optimiseStreams(),
executor,
cfnames);
if (session == null)
continue;
session.addCallback(new RepairSessionCallback(session));
futures.add(session);
}
return futures;
}

protected Future<CoordinatedRepairResult> runRepair(UUID parentSession,
boolean isIncremental,
ExecutorPlus executor,
List<CommonRange> commonRanges,
String... cfnames)
{
List<RepairSession> allSessions = submitRepairSessions(parentSession, isIncremental, executor, commonRanges, cfnames);
List<Collection<Range<Token>>> ranges = Lists.transform(allSessions, RepairSession::ranges);
Future<List<RepairSessionResult>> f = FutureCombiner.successfulOf(allSessions);
return f.map(results -> {
logger.debug("Repair result: {}", results);
return CoordinatedRepairResult.create(ranges, results);
});
}

private class RepairSessionCallback implements FutureCallback<RepairSessionResult>
{
private final RepairSession session;

public RepairSessionCallback(RepairSession session)
{
this.session = session;
}

public void onSuccess(RepairSessionResult result)
{
String message = String.format("Repair session %s for range %s finished", session.getId(),
session.ranges().toString());
notifier.notifyProgress(message);
}

public void onFailure(Throwable t)
{
String message = String.format("Repair session %s for range %s failed with error %s",
session.getId(), session.ranges().toString(), t.getMessage());
notifier.notifyError(new RuntimeException(message, t));
}
}
}

0 comments on commit b84ec51

Please sign in to comment.