Skip to content

Commit

Permalink
Partial compaction can resurrect deleted data
Browse files Browse the repository at this point in the history
patch by Tobias Lindaaker, Marcus Eriksson; reviewed by David Capwell, Marcus Eriksson for CASSANDRA-18507
  • Loading branch information
Tobias Lindaaker authored and dcapwell committed May 18, 2023
1 parent 2bb634a commit 1053e3b
Show file tree
Hide file tree
Showing 5 changed files with 340 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
@@ -1,4 +1,5 @@
4.0.10
* Partial compaction can resurrect deleted data (CASSANDRA-18507)
* Allow internal address to change with reconnecting snitches (CASSANDRA-16718)
* Fix quoting in toCqlString methods of UDTs and aggregates (CASSANDRA-17918)
* NPE when deserializing malformed collections from client (CASSANDRA-18505)
Expand Down
Expand Up @@ -116,7 +116,7 @@ public void maybeRefreshOverlaps()
}
}

private void refreshOverlaps()
void refreshOverlaps()
{
if (NEVER_PURGE_TOMBSTONES || cfs.getNeverPurgeTombstones())
return;
Expand Down
25 changes: 17 additions & 8 deletions src/java/org/apache/cassandra/db/compaction/CompactionTask.java
Expand Up @@ -83,13 +83,14 @@ public boolean reduceScopeForLimitedSpace(Set<SSTableReader> nonExpiredSSTables,
if (partialCompactionsAcceptable() && transaction.originals().size() > 1)
{
// Try again w/o the largest one.
logger.warn("insufficient space to compact all requested files. {}MB required, {} for compaction {}",
SSTableReader removedSSTable = cfs.getMaxSizeFile(nonExpiredSSTables);
logger.warn("insufficient space to compact all requested files. {}MB required, {} for compaction {} - removing largest SSTable: {}",
(float) expectedSize / 1024 / 1024,
StringUtils.join(transaction.originals(), ", "),
transaction.opId());
transaction.opId(),
removedSSTable);
// Note that we have removed files that are still marked as compacting.
// This suboptimal but ok since the caller will unmark all the sstables at the end.
SSTableReader removedSSTable = cfs.getMaxSizeFile(nonExpiredSSTables);
transaction.cancel(removedSSTable);
return true;
}
Expand Down Expand Up @@ -123,7 +124,12 @@ protected void runMayThrow() throws Exception
final Set<SSTableReader> fullyExpiredSSTables = controller.getFullyExpiredSSTables();

// select SSTables to compact based on available disk space.
buildCompactionCandidatesForAvailableDiskSpace(fullyExpiredSSTables);
if (!buildCompactionCandidatesForAvailableDiskSpace(fullyExpiredSSTables))
{
// The set of sstables has changed (one or more were excluded due to limited available disk space).
// We need to recompute the overlaps between sstables.
controller.refreshOverlaps();
}

// sanity check: all sstables must belong to the same cfs
assert !Iterables.any(transaction.originals(), new Predicate<SSTableReader>()
Expand Down Expand Up @@ -345,13 +351,15 @@ public static boolean getIsTransient(Set<SSTableReader> sstables)
* Checks if we have enough disk space to execute the compaction. Drops the largest sstable out of the Task until
* there's enough space (in theory) to handle the compaction. Does not take into account space that will be taken by
* other compactions.
*
* @return true if there is enough disk space to execute the complete compaction, false if some sstables are excluded.
*/
protected void buildCompactionCandidatesForAvailableDiskSpace(final Set<SSTableReader> fullyExpiredSSTables)
protected boolean buildCompactionCandidatesForAvailableDiskSpace(final Set<SSTableReader> fullyExpiredSSTables)
{
if(!cfs.isCompactionDiskSpaceCheckEnabled() && compactionType == OperationType.COMPACTION)
{
logger.info("Compaction space check is disabled");
return; // try to compact all SSTables
logger.info("Compaction space check is disabled - trying to compact all sstables");
return true;
}

final Set<SSTableReader> nonExpiredSSTables = Sets.difference(transaction.originals(), fullyExpiredSSTables);
Expand Down Expand Up @@ -395,8 +403,9 @@ protected void buildCompactionCandidatesForAvailableDiskSpace(final Set<SSTableR
{
CompactionManager.instance.incrementCompactionsReduced();
CompactionManager.instance.incrementSstablesDropppedFromCompactions(sstablesRemoved);
return false;
}

return true;
}

protected int getLevel()
Expand Down
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.distributed.test;

import java.io.IOException;
import java.util.Arrays;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.Test;

import net.bytebuddy.ByteBuddy;
import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
import net.bytebuddy.implementation.MethodDelegation;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;

import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import static org.junit.Assert.assertEquals;

public class CompactionOverlappingSSTableTest extends TestBaseImpl
{
@Test
public void partialCompactionOverlappingTest() throws IOException
{

try (Cluster cluster = init(builder().withNodes(1)
.withDataDirCount(1)
.withInstanceInitializer(BB::install)
.start()))
{
cluster.schemaChange(withKeyspace("alter keyspace %s with replication = {'class': 'SimpleStrategy', 'replication_factor':3}"));
cluster.schemaChange(withKeyspace("create table %s.tbl (id int primary key) with compaction = {'class':'SizeTieredCompactionStrategy', 'enabled': 'false'} AND gc_grace_seconds=0"));
Set<Integer> expected = Sets.newHashSetWithExpectedSize(990);
for (int i = 0; i < 1000; i++)
{
cluster.coordinator(1).execute(withKeyspace("insert into %s.tbl (id) values (?)"), ConsistencyLevel.ONE, i);
if (i >= 10)
expected.add(i);
}
cluster.get(1).flush(KEYSPACE);
for (int i = 0; i < 10; i++)
{
cluster.coordinator(1).execute(withKeyspace("delete from %s.tbl where id = ?"), ConsistencyLevel.ONE, i);
cluster.get(1).flush(KEYSPACE);
}
assertEquals(expected, Arrays.stream(cluster.coordinator(1).execute(withKeyspace("select * from %s.tbl"), ConsistencyLevel.ONE))
.map(x -> x[0])
.collect(Collectors.toSet()));

Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); // make sure tombstones are gc:able

cluster.get(1).runOnInstance(() -> {
BB.enabled.set(true);
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl");
cfs.forceMajorCompaction();
assertEquals("We should have 2 sstables (not 1) after major compaction since we reduced the scope of the compaction",
2, Iterables.size(cfs.getSSTables(SSTableSet.CANONICAL)));
});
assertEquals(expected, Arrays.stream(cluster.coordinator(1).execute(withKeyspace("select * from %s.tbl"), ConsistencyLevel.ONE))
.map(x -> x[0])
.collect(Collectors.toSet()));
}
}

public static class BB
{
static AtomicBoolean enabled = new AtomicBoolean();
public static void install(ClassLoader cl, Integer i)
{
new ByteBuddy().rebase(Directories.class)
.method(named("hasAvailableDiskSpace").and(takesArguments(2)))
.intercept(MethodDelegation.to(BB.class))
.make()
.load(cl, ClassLoadingStrategy.Default.INJECTION);
}

public static boolean hasAvailableDiskSpace(long ignore1, long ignore2)
{
if (enabled.get())
{
enabled.set(false);
return false;
}
return true;
}
}
}
@@ -0,0 +1,207 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.compaction;

import java.util.Iterator;

import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.RowUpdateBuilder;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.TableMetadataRef;
import org.apache.cassandra.utils.CloseableIterator;
import org.apache.cassandra.utils.FBUtilities;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertEquals;

public class PartialCompactionsTest extends SchemaLoader
{
static final String KEYSPACE = PartialCompactionsTest.class.getSimpleName();
static final String TABLE = "testtable";

@BeforeClass
public static void initSchema()
{
CompactionManager.instance.disableAutoCompaction();

SchemaLoader.createKeyspace(KEYSPACE,
KeyspaceParams.simple(1),
SchemaLoader.standardCFMD(KEYSPACE, TABLE));

LimitableDataDirectory.applyTo(KEYSPACE, TABLE);
}

@Before
public void prepareCFS()
{
LimitableDataDirectory.setAvailableSpace(cfStore(), null);
}

@After
public void truncateCF()
{
cfStore().truncateBlocking();
LifecycleTransaction.waitForDeletions();
}

private static ColumnFamilyStore cfStore()
{
return Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE);
}

@Test
public void shouldNotResurrectDataFromSSTableExcludedDueToInsufficientSpace()
{
// given
ColumnFamilyStore cfs = cfStore();
int few = 10, many = 10 * few;

// a large sstable as the oldest
createDataSSTable(cfs, 0, many);
// more inserts (to have more than one sstable to compact)
createDataSSTable(cfs, many, many + few);
// delete data that's in both of the prior sstables
createTombstonesSSTable(cfs, many - few / 2, many + few / 2);

// emulate there not being enough space to compact all sstables
LimitableDataDirectory.setAvailableSpace(cfs, enoughSpaceForAllButTheLargestSSTable(cfs));

// when - run a compaction where all tombstones have timed out
FBUtilities.waitOnFutures(CompactionManager.instance.submitMaximal(cfs, Integer.MAX_VALUE, false));

// then - the tombstones should not be removed
assertEquals("live sstables after compaction", 2, cfs.getLiveSSTables().size());
assertEquals("remaining live rows after compaction", many, liveRows(cfs));
}

private static long enoughSpaceForAllButTheLargestSSTable(ColumnFamilyStore cfs)
{
long totalSize = 1, maxSize = 0;
for (SSTableReader ssTable : cfs.getLiveSSTables())
{
long size = ssTable.onDiskLength();
if (size > maxSize) maxSize = size;
totalSize += size;
}
return totalSize - maxSize;
}

private static int liveRows(ColumnFamilyStore cfs)
{
return Util.getAll(Util.cmd(cfs, "key1").build()).stream()
.map(partition -> count(partition.rowIterator()))
.reduce(Integer::sum)
.orElse(0);
}

private static int count(Iterator<?> iter)
{
try (CloseableIterator<?> unused = iter instanceof CloseableIterator ? (CloseableIterator<?>) iter : null)
{
int count = 0;
for (; iter.hasNext(); iter.next())
{
count++;
}
return count;
}
}

private static void createDataSSTable(ColumnFamilyStore cfs, int firstKey, int endKey)
{
for (int i = firstKey; i < endKey; i++)
{
new RowUpdateBuilder(cfs.metadata(), 0, "key1")
.clustering(String.valueOf(i))
.add("val", String.valueOf(i))
.build()
.applyUnsafe();
}
cfs.forceBlockingFlush();
}

private static void createTombstonesSSTable(ColumnFamilyStore cfs, int firstKey, int endKey)
{
for (int i = firstKey; i < endKey; i++)
{
RowUpdateBuilder.deleteRow(cfs.metadata(), 1, "key1", String.valueOf(i)).applyUnsafe();
}
cfs.forceBlockingFlush();
}

private static class LimitableDataDirectory extends Directories.DataDirectory
{
private Long availableSpace;

LimitableDataDirectory(Directories.DataDirectory dataDirectory)
{
super(dataDirectory.location);
}

@Override
public long getAvailableSpace()
{
if (availableSpace != null)
return availableSpace;
return super.getAvailableSpace();
}

public static void setAvailableSpace(ColumnFamilyStore cfs, Long availableSpace)
{
for (Directories.DataDirectory location : cfs.getDirectories().getWriteableLocations())
{
assertThat("ColumnFamilyStore set up with ability to emulate limited disk space",
location, instanceOf(LimitableDataDirectory.class));
((LimitableDataDirectory) location).availableSpace = availableSpace;
}
}

public static void applyTo(String ks, String cf)
{
Keyspace keyspace = Keyspace.open(ks);
ColumnFamilyStore store = keyspace.getColumnFamilyStore(cf);
TableMetadataRef metadata = store.metadata;
keyspace.dropCf(metadata.id);
ColumnFamilyStore cfs = ColumnFamilyStore.createColumnFamilyStore(keyspace, cf, metadata, wrapDirectoriesOf(store), false, false, true);
keyspace.initCfCustom(cfs);
}

private static Directories wrapDirectoriesOf(ColumnFamilyStore cfs)
{
Directories.DataDirectory[] original = cfs.getDirectories().getWriteableLocations();
Directories.DataDirectory[] wrapped = new Directories.DataDirectory[original.length];
for (int i = 0; i < wrapped.length; i++)
{
wrapped[i] = new LimitableDataDirectory(original[i]);
}
return new Directories(cfs.metadata(), wrapped);
}
}
}

0 comments on commit 1053e3b

Please sign in to comment.