Skip to content

Commit

Permalink
added coverage in ImportIndexedSSTablesTest for the case where we imp…
Browse files Browse the repository at this point in the history
…ort already existing SAI index components (i.e. make sure we checksum them)
  • Loading branch information
maedhroz committed Jul 17, 2023
1 parent 5f88656 commit 50a01bd
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 14 deletions.
5 changes: 4 additions & 1 deletion src/java/org/apache/cassandra/db/SSTableImporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,10 @@ synchronized List<String> importNewSSTables(Options options)
{
abortIfDraining();

cfs.indexManager.buildIndexesBlockingIncremental(newSSTables);
// Validate existing index components on disk, and then build any that are missing:
if (!cfs.indexManager.validateStorageAttachedIndexes(newSSTables, false))
cfs.indexManager.buildIndexesBlockingIncremental(newSSTables);

cfs.getTracker().addSSTables(newSSTables);
for (SSTableReader reader : newSSTables)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,8 @@ public void finished()
{
try
{
// SSTable-attached indexes may have been streamed completely, and they must be validated:
cfs.indexManager.validateStorageAttachedIndexes(readers);
// Validate the SSTable-attached indexes that should have been streamed completely:
cfs.indexManager.validateStorageAttachedIndexes(readers, true);
}
catch (IOException e)
{
Expand Down
41 changes: 33 additions & 8 deletions src/java/org/apache/cassandra/index/SecondaryIndexManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -498,28 +498,53 @@ public static String getIndexName(String cfName)
}

// TODO: JavaDoc if this approach survives...mention that this should not be used for live SSTables
public void validateStorageAttachedIndexes(Collection<SSTableReader> sstables) throws IOException
public boolean validateStorageAttachedIndexes(Collection<SSTableReader> sstables, boolean failIncomplete) throws IOException
{
// TODO: A more generic approach via the Index interface rather than just working w/ SAI
Set<StorageAttachedIndex> toValidate = indexes.values().stream().filter(i -> i instanceof StorageAttachedIndex)
.map(i -> (StorageAttachedIndex) i)
.collect(Collectors.toSet());

if (toValidate.isEmpty())
return;
return true;

boolean complete = true;

for (SSTableReader sstable : sstables)
{
IndexDescriptor indexDescriptor = IndexDescriptor.create(sstable);

if (!indexDescriptor.isPerSSTableIndexBuildComplete())
throw new IllegalStateException(indexDescriptor.logMessage("Incomplete per-SSTable index"));

indexDescriptor.checksumPerSSTableComponents();
if (indexDescriptor.isPerSSTableIndexBuildComplete())
{
indexDescriptor.checksumPerSSTableComponents();

for (StorageAttachedIndex index : toValidate)
indexDescriptor.checksumPerIndexComponents(index.getIndexContext());
for (StorageAttachedIndex index : toValidate)
{
if (indexDescriptor.isPerColumnIndexBuildComplete(index.getIndexContext()))
{
indexDescriptor.checksumPerIndexComponents(index.getIndexContext());
}
else if (failIncomplete)
{
throw new IllegalStateException(indexDescriptor.logMessage("Incomplete per-column index build"));
}
else
{
complete = false;
}
}
}
else if (failIncomplete)
{
throw new IllegalStateException(indexDescriptor.logMessage("Incomplete per-SSTable index build"));
}
else
{
complete = false;
}
}

return complete;
}

// TODO: JavaDoc if this approach survives...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ public class StorageAttachedIndexBuilder extends SecondaryIndexBuilder
@Override
public void build()
{
logger.debug(logMessage("Starting full index build"));
logger.debug(logMessage(String.format("Starting %s %s index build...", isInitialBuild ? "initial" : "non-initial", isFullRebuild ? "full" : "partial")));

for (Map.Entry<SSTableReader, Set<StorageAttachedIndex>> e : sstables.entrySet())
{
SSTableReader sstable = e.getKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.cassandra.distributed.test.sai;

import java.io.IOException;

import net.bytebuddy.ByteBuddy;
import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
import net.bytebuddy.implementation.MethodDelegation;
Expand All @@ -30,6 +32,9 @@
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.test.TestBaseImpl;
import org.apache.cassandra.index.sai.StorageAttachedIndexBuilder;
import org.apache.cassandra.index.sai.disk.v1.SAICodecUtils;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.store.IndexInput;

import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -45,7 +50,7 @@ public void testIndexBuildingFailureDuringImport() throws Exception
{
try (Cluster cluster = init(Cluster.build(1)
.withConfig(c -> c.with(NETWORK, GOSSIP))
.withInstanceInitializer(ByteBuddyHelper::install)
.withInstanceInitializer(ByteBuddyHelper::installBuildInterruption)
.start()))
{
cluster.disableAutoCompaction(KEYSPACE);
Expand Down Expand Up @@ -126,10 +131,104 @@ public void testImportBuildsSSTableIndexes() throws Exception
}
}

@Test
public void testValidationFailureDuringImport() throws Exception
{
try (Cluster cluster = init(Cluster.build(1)
.withConfig(c -> c.with(NETWORK, GOSSIP))
.withInstanceInitializer(ByteBuddyHelper::installValidationError)
.start()))
{
cluster.disableAutoCompaction(KEYSPACE);
cluster.schemaChange(withKeyspace("CREATE TABLE %s.test (pk int PRIMARY KEY, v text)"));

IInvokableInstance first = cluster.get(1);

first.executeInternal(withKeyspace("INSERT INTO %s.test(pk, v) VALUES (?, ?)"), 1, "v1");
first.flush(KEYSPACE);

Object[][] rs = first.executeInternal(withKeyspace("select pk from %s.test where pk = ?"), 1);
assertThat(rs.length).isEqualTo(1);

cluster.schemaChange(withKeyspace("CREATE CUSTOM INDEX test_v_index ON %s.test(v) USING 'StorageAttachedIndex'"));
SAIUtil.waitForIndexQueryable(cluster, KEYSPACE, "test_v_index");

rs = first.executeInternal(withKeyspace("select pk from %s.test where v = ?"), "v1");
assertThat(rs.length).isEqualTo(1);
assertThat(rs[0][0]).isEqualTo(1);

first.runOnInstance(() -> {
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore("test");
cfs.clearUnsafe();
});

rs = first.executeInternal(withKeyspace("select pk from %s.test where pk = ?"), 1);
assertThat(rs.length).isEqualTo(0);
rs = first.executeInternal(withKeyspace("select pk from %s.test where v = ?"), "v1");
assertThat(rs.length).isEqualTo(0);

first.runOnInstance(() ->
assertThatThrownBy(() ->
ColumnFamilyStore.loadNewSSTables(KEYSPACE, "test")).hasRootCauseExactlyInstanceOf(CorruptIndexException.class));

rs = first.executeInternal(withKeyspace("select pk from %s.test where pk = ?"), 1);
assertThat(rs.length).isEqualTo(0);
rs = first.executeInternal(withKeyspace("select pk from %s.test where v = ?"), "v1");
assertThat(rs.length).isEqualTo(0);
}
}

@Test
public void testImportIncludesExistingSSTableIndexes() throws Exception
{
try (Cluster cluster = init(Cluster.build(1)
.withConfig(c -> c.with(NETWORK, GOSSIP))
.start()))
{
cluster.disableAutoCompaction(KEYSPACE);
cluster.schemaChange(withKeyspace("CREATE TABLE %s.test (pk int PRIMARY KEY, v text)"));

IInvokableInstance first = cluster.get(1);

first.executeInternal(withKeyspace("INSERT INTO %s.test(pk, v) VALUES (?, ?)"), 1, "v1");
first.flush(KEYSPACE);

Object[][] rs = first.executeInternal(withKeyspace("select pk from %s.test where pk = ?"), 1);
assertThat(rs.length).isEqualTo(1);

cluster.schemaChange(withKeyspace("CREATE CUSTOM INDEX test_v_index ON %s.test(v) USING 'StorageAttachedIndex'"));
SAIUtil.waitForIndexQueryable(cluster, KEYSPACE, "test_v_index");

rs = first.executeInternal(withKeyspace("select pk from %s.test where v = ?"), "v1");
assertThat(rs.length).isEqualTo(1);
assertThat(rs[0][0]).isEqualTo(1);

first.runOnInstance(() -> {
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore("test");
cfs.clearUnsafe();
});

rs = first.executeInternal(withKeyspace("select pk from %s.test where pk = ?"), 1);
assertThat(rs.length).isEqualTo(0);
rs = first.executeInternal(withKeyspace("select pk from %s.test where v = ?"), "v1");
assertThat(rs.length).isEqualTo(0);

first.runOnInstance(() -> ColumnFamilyStore.loadNewSSTables(KEYSPACE, "test"));

rs = first.executeInternal(withKeyspace("select pk from %s.test where pk = ?"), 1);
assertThat(rs.length).isEqualTo(1);
assertThat(rs[0][0]).isEqualTo(1);

rs = first.executeInternal(withKeyspace("select pk from %s.test where v = ?"), "v1");
assertThat(rs.length).isEqualTo(1);
assertThat(rs[0][0]).isEqualTo(1);
}
}

public static class ByteBuddyHelper
{
@SuppressWarnings("resource")
static void install(ClassLoader loader, int node)
static void installBuildInterruption(ClassLoader loader, int node)
{
new ByteBuddy().redefine(StorageAttachedIndexBuilder.class)
.method(named("isStopRequested"))
Expand All @@ -143,5 +242,21 @@ public static boolean isStopRequested()
{
return true;
}

@SuppressWarnings("resource")
static void installValidationError(ClassLoader loader, int node)
{
new ByteBuddy().redefine(SAICodecUtils.class)
.method(named("validateChecksum"))
.intercept(MethodDelegation.to(ByteBuddyHelper.class))
.make()
.load(loader, ClassLoadingStrategy.Default.INJECTION);
}

@SuppressWarnings("unused")
public static void validateChecksum(IndexInput input) throws IOException
{
throw new CorruptIndexException("Injected failure!", "Test resource");
}
}
}

0 comments on commit 50a01bd

Please sign in to comment.