Skip to content

Commit

Permalink
nodetool compact should support using a key string to find the range …
Browse files Browse the repository at this point in the history
…to avoid operators having to manually do this

patch by David Capwell; reviewed by Marcus Eriksson for CASSANDRA-17537
  • Loading branch information
dcapwell committed Apr 22, 2022
1 parent 003a96b commit 2b90ac1
Show file tree
Hide file tree
Showing 12 changed files with 274 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
@@ -1,4 +1,5 @@
4.1
* nodetool compact should support using a key string to find the range to avoid operators having to manually do this (CASSANDRA-17537)
* Add guardrail for data disk usage (CASSANDRA-17150)
* Tool to list data paths of existing tables (CASSANDRA-17568)
* Migrate track_warnings to more standard naming conventions and use latest configuration types rather than long (CASSANDRA-17560)
Expand Down
5 changes: 5 additions & 0 deletions src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Expand Up @@ -2365,6 +2365,11 @@ static Set<Range<Token>> toTokenRanges(IPartitioner partitioner, String... strin
return tokenRanges;
}

public void forceCompactionForKey(DecoratedKey key)
{
CompactionManager.instance.forceCompactionForKey(this, key);
}

public static Iterable<ColumnFamilyStore> all()
{
List<Iterable<ColumnFamilyStore>> stores = new ArrayList<>(Schema.instance.getKeyspaces().size());
Expand Down
Expand Up @@ -34,7 +34,6 @@
import org.apache.cassandra.io.util.FileDataInput;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
import org.apache.cassandra.utils.AlwaysPresentFilter;
import org.apache.cassandra.utils.OverlapIterator;
import org.apache.cassandra.utils.concurrent.Refs;

Expand Down Expand Up @@ -255,10 +254,7 @@ public LongPredicate getPurgeEvaluator(DecoratedKey key)

for (SSTableReader sstable: filteredSSTables)
{
// if we don't have bloom filter(bf_fp_chance=1.0 or filter file is missing),
// we check index file instead.
if (sstable.getBloomFilter() instanceof AlwaysPresentFilter && sstable.getPosition(key, SSTableReader.Operator.EQ, false) != null
|| sstable.getBloomFilter().isPresent(key))
if (sstable.maybePresent(key))
{
minTimestampSeen = Math.min(minTimestampSeen, sstable.getMinTimestamp());
hasTimestamp = true;
Expand Down
30 changes: 27 additions & 3 deletions src/java/org/apache/cassandra/db/compaction/CompactionManager.java
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BooleanSupplier;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
Expand Down Expand Up @@ -928,10 +929,10 @@ protected void runMayThrow()
return futures;
}

public void forceCompactionForTokenRange(ColumnFamilyStore cfStore, Collection<Range<Token>> ranges)
public void forceCompaction(ColumnFamilyStore cfStore, Supplier<Collection<SSTableReader>> sstablesFn, com.google.common.base.Predicate<SSTableReader> sstablesPredicate)
{
Callable<CompactionTasks> taskCreator = () -> {
Collection<SSTableReader> sstables = sstablesInBounds(cfStore, ranges);
Collection<SSTableReader> sstables = sstablesFn.get();
if (sstables == null || sstables.isEmpty())
{
logger.debug("No sstables found for the provided token range");
Expand All @@ -941,7 +942,7 @@ public void forceCompactionForTokenRange(ColumnFamilyStore cfStore, Collection<R
};

try (CompactionTasks tasks = cfStore.runWithCompactionsDisabled(taskCreator,
(sstable) -> new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(ranges),
sstablesPredicate,
false,
false,
false))
Expand All @@ -963,6 +964,11 @@ protected void runMayThrow()
}
}

public void forceCompactionForTokenRange(ColumnFamilyStore cfStore, Collection<Range<Token>> ranges)
{
forceCompaction(cfStore, () -> sstablesInBounds(cfStore, ranges), (sstable) -> new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(ranges));
}

private static Collection<SSTableReader> sstablesInBounds(ColumnFamilyStore cfs, Collection<Range<Token>> tokenRangeCollection)
{
final Set<SSTableReader> sstables = new HashSet<>();
Expand Down Expand Up @@ -990,6 +996,24 @@ private static Collection<SSTableReader> sstablesInBounds(ColumnFamilyStore cfs,
return sstables;
}

public void forceCompactionForKey(ColumnFamilyStore cfStore, DecoratedKey key)
{
forceCompaction(cfStore, () -> sstablesWithKey(cfStore, key), sstable -> sstable.maybePresent(key));
}

private static Collection<SSTableReader> sstablesWithKey(ColumnFamilyStore cfs, DecoratedKey key)
{
final Set<SSTableReader> sstables = new HashSet<>();
Iterable<SSTableReader> liveTables = cfs.getTracker().getView().liveSSTablesInBounds(key.getToken().minKeyBound(),
key.getToken().maxKeyBound());
for (SSTableReader sstable : liveTables)
{
if (sstable.maybePresent(key))
sstables.add(sstable);
}
return sstables.isEmpty() ? Collections.emptyList() : sstables;
}

public void forceUserDefinedCompaction(String dataFiles)
{
String[] filenames = dataFiles.split(",");
Expand Down
5 changes: 5 additions & 0 deletions src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
Expand Up @@ -214,6 +214,11 @@ public LongToken decreaseSlightly()
return new LongToken(token - 1);
}

public static ByteBuffer keyForToken(long token)
{
return keyForToken(new LongToken(token));
}

/**
* Reverses murmur3 to find a possible 16 byte key that generates a given token
*/
Expand Down
Expand Up @@ -2003,6 +2003,13 @@ public void addTo(Ref.IdentityCollection identities)

}

public boolean maybePresent(DecoratedKey key)
{
// if we don't have bloom filter(bf_fp_chance=1.0 or filter file is missing),
// we check index file instead.
return bf instanceof AlwaysPresentFilter && getPosition(key, Operator.EQ, false) != null || bf.isPresent(key);
}

/**
* One instance per SSTableReader we create.
*
Expand Down
49 changes: 45 additions & 4 deletions src/java/org/apache/cassandra/service/StorageService.java
Expand Up @@ -3914,6 +3914,7 @@ public void takeTableSnapshot(String keyspaceName, String tableName, String tag)
takeMultipleTableSnapshot(tag, false, null, keyspaceName + "." + tableName);
}

@Override
public void forceKeyspaceCompactionForTokenRange(String keyspaceName, String startToken, String endToken, String... tableNames) throws IOException, ExecutionException, InterruptedException
{
Collection<Range<Token>> tokenRanges = createRepairRangeFrom(startToken, endToken);
Expand All @@ -3924,6 +3925,30 @@ public void forceKeyspaceCompactionForTokenRange(String keyspaceName, String sta
}
}

@Override
public void forceKeyspaceCompactionForPartitionKey(String keyspaceName, String partitionKey, String... tableNames) throws IOException, ExecutionException, InterruptedException
{
// validate that the key parses before attempting compaction
for (ColumnFamilyStore cfStore : getValidColumnFamilies(true, false, keyspaceName, tableNames))
{
try
{
getKeyFromPartition(keyspaceName, cfStore.name, partitionKey);
}
catch (Exception e)
{
// JMX can not handle exceptions defined outside of java.* and javax.*, so safer to rewrite the exception
IllegalArgumentException exception = new IllegalArgumentException(String.format("Unable to parse partition key '%s' for table %s; %s", partitionKey, cfStore.metadata, e.getMessage()));
exception.setStackTrace(e.getStackTrace());
throw exception;
}
}
for (ColumnFamilyStore cfStore : getValidColumnFamilies(true, false, keyspaceName, tableNames))
{
cfStore.forceCompactionForKey(getKeyFromPartition(keyspaceName, cfStore.name, partitionKey));
}
}

/**
* Takes the snapshot for the given keyspaces. A snapshot name must be specified.
*
Expand Down Expand Up @@ -4572,6 +4597,22 @@ public List<String> getNaturalEndpointsWithPort(String keyspaceName, ByteBuffer
}

public EndpointsForToken getNaturalReplicasForToken(String keyspaceName, String cf, String key)
{
return getNaturalReplicasForToken(keyspaceName, partitionKeyToBytes(keyspaceName, cf, key));
}

public EndpointsForToken getNaturalReplicasForToken(String keyspaceName, ByteBuffer key)
{
Token token = tokenMetadata.partitioner.getToken(key);
return Keyspace.open(keyspaceName).getReplicationStrategy().getNaturalReplicasForToken(token);
}

public DecoratedKey getKeyFromPartition(String keyspaceName, String table, String partitionKey)
{
return tokenMetadata.partitioner.decorateKey(partitionKeyToBytes(keyspaceName, table, partitionKey));
}

private static ByteBuffer partitionKeyToBytes(String keyspaceName, String cf, String key)
{
KeyspaceMetadata ksMetaData = Schema.instance.getKeyspaceMetadata(keyspaceName);
if (ksMetaData == null)
Expand All @@ -4581,13 +4622,13 @@ public EndpointsForToken getNaturalReplicasForToken(String keyspaceName, String
if (metadata == null)
throw new IllegalArgumentException("Unknown table '" + cf + "' in keyspace '" + keyspaceName + "'");

return getNaturalReplicasForToken(keyspaceName, metadata.partitionKeyType.fromString(key));
return metadata.partitionKeyType.fromString(key);
}

public EndpointsForToken getNaturalReplicasForToken(String keyspaceName, ByteBuffer key)
@Override
public String getToken(String keyspaceName, String table, String key)
{
Token token = tokenMetadata.partitioner.getToken(key);
return Keyspace.open(keyspaceName).getReplicationStrategy().getNaturalReplicasForToken(token);
return tokenMetadata.partitioner.getToken(partitionKeyToBytes(keyspaceName, table, key)).toString();
}

public void setLoggingLevel(String classQualifier, String rawLevel) throws Exception
Expand Down
Expand Up @@ -330,6 +330,11 @@ public interface StorageServiceMBean extends NotificationEmitter
*/
public void forceKeyspaceCompactionForTokenRange(String keyspaceName, String startToken, String endToken, String... tableNames) throws IOException, ExecutionException, InterruptedException;

/**
* Forces major compactions for the range represented by the partition key
*/
public void forceKeyspaceCompactionForPartitionKey(String keyspaceName, String partitionKey, String... tableNames) throws IOException, ExecutionException, InterruptedException;

/**
* Trigger a cleanup of keys on a single keyspace
*/
Expand Down Expand Up @@ -984,4 +989,6 @@ public void enableAuditLog(String loggerName, String includedKeyspaces, String e
public void clearPaxosRepairs();
public void setSkipPaxosRepairCompatibilityCheck(boolean v);
public boolean getSkipPaxosRepairCompatibilityCheck();

String getToken(String keyspaceName, String table, String partitionKey);
}
5 changes: 5 additions & 0 deletions src/java/org/apache/cassandra/tools/NodeProbe.java
Expand Up @@ -441,6 +441,11 @@ public void forceKeyspaceCompactionForTokenRange(String keyspaceName, final Stri
ssProxy.forceKeyspaceCompactionForTokenRange(keyspaceName, startToken, endToken, tableNames);
}

public void forceKeyspaceCompactionForPartitionKey(String keyspaceName, String partitionKey, String... tableNames) throws InterruptedException, ExecutionException, IOException
{
ssProxy.forceKeyspaceCompactionForPartitionKey(keyspaceName, partitionKey, tableNames);
}

public void forceKeyspaceFlush(String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException
{
ssProxy.forceKeyspaceFlush(keyspaceName, tableNames);
Expand Down
13 changes: 11 additions & 2 deletions src/java/org/apache/cassandra/tools/nodetool/Compact.java
Expand Up @@ -47,11 +47,16 @@ public class Compact extends NodeToolCmd
@Option(title = "end_token", name = {"-et", "--end-token"}, description = "Use -et to specify a token at which compaction range ends")
private String endToken = EMPTY;

@Option(title = "partition_key", name = {"--partition"}, description = "String representation of the partition key")
private String partitionKey = EMPTY;


@Override
public void execute(NodeProbe probe)
{
final boolean tokenProvided = !(startToken.isEmpty() && endToken.isEmpty());
final boolean startEndTokenProvided = !(startToken.isEmpty() && endToken.isEmpty());
final boolean partitionKeyProvided = !partitionKey.isEmpty();
final boolean tokenProvided = startEndTokenProvided || partitionKeyProvided;
if (splitOutput && (userDefined || tokenProvided))
{
throw new RuntimeException("Invalid option combination: Can not use split-output here");
Expand Down Expand Up @@ -80,10 +85,14 @@ public void execute(NodeProbe probe)
{
try
{
if (tokenProvided)
if (startEndTokenProvided)
{
probe.forceKeyspaceCompactionForTokenRange(keyspace, startToken, endToken, tableNames);
}
else if (partitionKeyProvided)
{
probe.forceKeyspaceCompactionForPartitionKey(keyspace, partitionKey, tableNames);
}
else
{
probe.forceKeyspaceCompaction(splitOutput, keyspace, tableNames);
Expand Down
53 changes: 53 additions & 0 deletions test/unit/org/apache/cassandra/tools/ToolRunner.java
Expand Up @@ -34,6 +34,7 @@
import java.util.Objects;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
Expand All @@ -45,8 +46,11 @@
import org.apache.cassandra.distributed.api.IInstance;
import org.apache.cassandra.distributed.api.NodeToolResult;
import org.apache.cassandra.utils.Pair;
import org.assertj.core.api.Assertions;
import org.assertj.core.util.Lists;
import org.assertj.core.util.Strings;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand Down Expand Up @@ -455,6 +459,55 @@ public void assertOnCleanExit()
assertOnExitCode();
assertCleanStdErr();
}

public AssertHelp asserts()
{
return new AssertHelp();
}

public final class AssertHelp
{
public AssertHelp success()
{
if (exitCode != 0)
fail("was not successful");
return this;
}

public AssertHelp failure()
{
if (exitCode == 0)
fail("was not successful");
return this;
}

public AssertHelp errorContains(String messages)
{
return errorContainsAny(messages);
}

public AssertHelp errorContainsAny(String... messages)
{
assertThat(messages).hasSizeGreaterThan(0);
assertThat(stderr).isNotNull();
if (!Stream.of(messages).anyMatch(stderr::contains))
fail("stderr does not contain " + Arrays.toString(messages));
return this;
}

private void fail(String msg)
{
StringBuilder sb = new StringBuilder();
sb.append("nodetool command ").append(String.join(" ", allArgs)).append(": ").append(msg).append('\n');
if (stdout != null)
sb.append("stdout:\n").append(stdout).append('\n');
if (stderr != null)
sb.append("stderr:\n").append(stderr).append('\n');
if (e != null)
sb.append("Exception:\n").append(Throwables.getStackTraceAsString(e)).append('\n');
throw new AssertionError(sb.toString());
}
}
}

public interface ObservableTool extends AutoCloseable
Expand Down

0 comments on commit 2b90ac1

Please sign in to comment.