Skip to content
Closed
8,464 changes: 3,722 additions & 4,742 deletions .circleci/config.yml

Large diffs are not rendered by default.

548 changes: 347 additions & 201 deletions .circleci/config_11_and_17.yml

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion .circleci/generate_11_and_17.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#

BASEDIR=`dirname $0`
BASE_BRANCH=trunk
BASE_BRANCH=cep-7-sai
set -e

die ()
Expand Down
10 changes: 5 additions & 5 deletions src/java/org/apache/cassandra/index/sai/IndexContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,9 @@ public IndexMetadata getIndexMetadata()
/**
* @return A set of SSTables which have attached to them invalid index components.
*/
public Collection<SSTableContext> onSSTableChanged(Collection<SSTableReader> oldSSTables, Collection<SSTableContext> newSSTables, boolean validate)
public Collection<SSTableContext> onSSTableChanged(Collection<SSTableReader> oldSSTables, Collection<SSTableContext> newSSTables, IndexValidation validation)
{
return viewManager.update(oldSSTables, newSSTables, validate);
return viewManager.update(oldSSTables, newSSTables, validation);
}

public ColumnMetadata getDefinition()
Expand Down Expand Up @@ -408,7 +408,7 @@ public String logMessage(String message)
* @return the indexes that are built on the given SSTables on the left and corrupted indexes'
* corresponding contexts on the right
*/
public Pair<Collection<SSTableIndex>, Collection<SSTableContext>> getBuiltIndexes(Collection<SSTableContext> sstableContexts, boolean validate)
public Pair<Collection<SSTableIndex>, Collection<SSTableContext>> getBuiltIndexes(Collection<SSTableContext> sstableContexts, IndexValidation validation)
{
Set<SSTableIndex> valid = new HashSet<>(sstableContexts.size());
Set<SSTableContext> invalid = new HashSet<>();
Expand All @@ -433,9 +433,9 @@ public Pair<Collection<SSTableIndex>, Collection<SSTableContext>> getBuiltIndexe

try
{
if (validate)
if (validation != IndexValidation.NONE)
{
if (!sstableContext.indexDescriptor.validatePerIndexComponents(this))
if (!sstableContext.indexDescriptor.validatePerIndexComponents(this, validation))
{
logger.warn(logMessage("Invalid per-column component for SSTable {}"), sstableContext.descriptor());
invalid.add(sstableContext);
Expand Down
38 changes: 38 additions & 0 deletions src/java/org/apache/cassandra/index/sai/IndexValidation.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.index.sai;

public enum IndexValidation
{
/**
* No validation to be performed
*/
NONE,

/**
* Basic header/footer validation, but no data validation (fast)
*/
HEADER_FOOTER,

/**
* Full validation with checksumming data (slow)
*/
CHECKSUM

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ public class SSTableContextManager
*
* @param removed SSTables being removed
* @param added SSTables being added
* @param validate if true, header and footer will be validated.
* @param validation Controls how indexes should be validated
*
* @return a set of contexts for SSTables with valid per-SSTable components, and a set of
* SSTables with invalid or missing components
*/
public Pair<Set<SSTableContext>, Set<SSTableReader>> update(Collection<SSTableReader> removed, Iterable<SSTableReader> added, boolean validate)
public Pair<Set<SSTableContext>, Set<SSTableReader>> update(Collection<SSTableReader> removed, Iterable<SSTableReader> added, IndexValidation validation)
{
release(removed);

Expand All @@ -77,7 +77,7 @@ public Pair<Set<SSTableContext>, Set<SSTableReader>> update(Collection<SSTableRe
try
{
// Only validate on restart or newly refreshed SSTable. Newly built files are unlikely to be corrupted.
if (validate && !sstableContexts.containsKey(sstable) && !indexDescriptor.validatePerSSTableComponents())
if (!sstableContexts.containsKey(sstable) && !indexDescriptor.validatePerSSTableComponents(validation))
{
logger.warn(indexDescriptor.logMessage("Invalid per-SSTable component for SSTable {}"), sstable.descriptor);
invalid.add(sstable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,10 +271,11 @@ public Callable<?> getInitializationTask()
// New storage-attached indexes will be available for queries after on disk index data are built.
// Memtable data will be indexed via flushing triggered by schema change
// We only want to validate the index files if we are starting up
return () -> startInitialBuild(baseCfs, StorageService.instance.isStarting()).get();
IndexValidation validation = StorageService.instance.isStarting() ? IndexValidation.HEADER_FOOTER : IndexValidation.NONE;
return () -> startInitialBuild(baseCfs, validation).get();
}

private Future<?> startInitialBuild(ColumnFamilyStore baseCfs, boolean validate)
private Future<?> startInitialBuild(ColumnFamilyStore baseCfs, IndexValidation validation)
{
if (baseCfs.indexManager.isIndexQueryable(this))
{
Expand Down Expand Up @@ -303,7 +304,7 @@ private Future<?> startInitialBuild(ColumnFamilyStore baseCfs, boolean validate)

assert indexGroup != null : "Index group does not exist for table " + baseCfs.keyspace + '.' + baseCfs.name;

List<SSTableReader> nonIndexed = findNonIndexedSSTables(baseCfs, indexGroup, validate);
List<SSTableReader> nonIndexed = findNonIndexedSSTables(baseCfs, indexGroup, validation);

if (nonIndexed.isEmpty())
{
Expand Down Expand Up @@ -430,7 +431,7 @@ private Future<?> startPreJoinTask()

assert indexGroup != null : "Index group does not exist for table";

Collection<SSTableReader> nonIndexed = findNonIndexedSSTables(baseCfs, indexGroup, true);
Collection<SSTableReader> nonIndexed = findNonIndexedSSTables(baseCfs, indexGroup, IndexValidation.HEADER_FOOTER);

if (nonIndexed.isEmpty())
{
Expand Down Expand Up @@ -522,13 +523,13 @@ public void validate(PartitionUpdate update) throws InvalidRequestException
*
* @return a list SSTables without attached indexes
*/
private synchronized List<SSTableReader> findNonIndexedSSTables(ColumnFamilyStore baseCfs, StorageAttachedIndexGroup group, boolean validate)
private synchronized List<SSTableReader> findNonIndexedSSTables(ColumnFamilyStore baseCfs, StorageAttachedIndexGroup group, IndexValidation validation)
{
Set<SSTableReader> sstables = baseCfs.getLiveSSTables();

// Initialize the SSTable indexes w/ valid existing components...
assert group != null : "Missing index group on " + baseCfs.name;
group.onSSTableChanged(Collections.emptyList(), sstables, Collections.singleton(this), validate);
group.onSSTableChanged(Collections.emptyList(), sstables, Collections.singleton(this), validation);

// ...then identify and rebuild the SSTable indexes that are missing.
List<SSTableReader> nonIndexed = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,9 @@ private CountDownLatch shouldWritePerSSTableFiles(SSTableReader sstable)
{
// if per-table files are incomplete or checksum failed during full rebuild.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Comment needs to change slightly now, something like...

// if per-SSTable files are incomplete, this is a full rebuild, or they are complete but checksum failed

IndexDescriptor indexDescriptor = IndexDescriptor.create(sstable);
if (!indexDescriptor.isPerSSTableIndexBuildComplete() ||
(isFullRebuild && !indexDescriptor.validatePerSSTableComponentsChecksum()))
if (!indexDescriptor.isPerSSTableIndexBuildComplete()
|| isFullRebuild
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we initially wrote this, we couldn't think of any cases where the SSTable-level indexes would need to be rebuilt if existing and valid. The column-level bits are typically what need to change, say if you slightly changed filtering/analysis on a text field. However, I'm not opposed to this if you think it's clearer that "full rebuild" really means "full" rebuild :)

|| !indexDescriptor.validatePerSSTableComponents(IndexValidation.CHECKSUM))
{
CountDownLatch latch = CountDownLatch.newCountDownLatch(1);
if (inProgress.putIfAbsent(sstable, latch) == null)
Expand Down Expand Up @@ -307,7 +308,7 @@ private void completeSSTable(SSTableFlushObserver indexWriter,

// register custom index components into existing sstables
sstable.registerComponents(StorageAttachedIndexGroup.getLiveComponents(sstable, existing), tracker);
Set<StorageAttachedIndex> incomplete = group.onSSTableChanged(Collections.emptyList(), Collections.singleton(sstable), existing, false);
Set<StorageAttachedIndex> incomplete = group.onSSTableChanged(Collections.emptyList(), Collections.singleton(sstable), existing, IndexValidation.NONE);

if (!incomplete.isEmpty())
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,15 +250,15 @@ public void handleNotification(INotification notification, Object sender)
SSTableAddedNotification notice = (SSTableAddedNotification) notification;

// Avoid validation for index files just written following Memtable flush.
boolean validate = !notice.memtable().isPresent();
IndexValidation validate = notice.memtable().isPresent() ? IndexValidation.NONE : IndexValidation.CHECKSUM;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is the crux of the change, correct? (i.e. SSTables added via streaming or import will be checksummed, unlike before, where they would undergo only header/footer validation?)

Copy link
Contributor

@maedhroz maedhroz Jul 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of the general things I looked at while reviewing here was whether the notification order of the different INotificationConsumer things matters. StorageAttachedIndexGroup and SecondaryIndexManager in particular are the interesting ones. Basically...do we checksum the things we should and not the things we shouldn't, given we happen to have SIM registered before the index group? Here's what I've come up with:

1.) When we stream entire SSTables w/ their attached index components (or locally import new SSTables w/ index components already built), SIM handles the notification, but the index build it triggers should be a no-op. Even if it built something it wouldn't register as a full rebuild and no checksumming would be done. Then when StorageAttachedIndexGroup handles the notification here, it does the checksum, effectively protecting us from corruption introduced by a streaming source. This all seems reasonable.

2.) When we don't stream an entire SSTable during repair or partial SSTable bootstrapping (or locally import new SSTables w/ index components not already built), SIM handles the notification and blocks to build SSTable-attached index bits. Then (again, this is an accident of our ordering) StorageAttachedIndexGroup handles the notification. It sees that there isn't a Memtable and validates the checksum. This isn't the end of the world, but it would seem unnecessary, given we just build the indexes from the streamed SSTables. If we knew that all of the added SSTables didn't come with SAI components, we'd bypass checksumming, right?

I'm not sure how exactly we want to address this, or whether we would want to address it in this patch (although that might be nice, since we have it loaded into our working memories), but perhaps we could agree that it's an issue (or not, and that I'm missing something) and go from there?

Two things that come to mind to "fix" this are a.) having more information in the notification or b.) having the index group handle notifications before SIM does. (SIM only handles SSTableAddedNotification FWIW).

CC @mike-tr-adamson @adelapena

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have been looking at this in some detail today to try and establish the best way to correct it.

From our perspective, the SSTableAddedNotification is handled in 2 places. It is handled by the SecondaryIndexManager that with invoke a fullRebuild = false index build on the added SSTables, and it is handled by the StorageAttachedIndexGroup that will validate and register the SSTables.

The problem with the current implementation is:

  • The invoked index build will rebuild the per-column index components for fullRebuild = false even if the components are present and valid. We do, though, only build the per-SSTable components if they are invalid, in this case.
  • The SAIG handler will validate the components even though they have been already validated by the build.

What should be noted here is that the SIM will only invoke an index build if a memtable is not present in the notification, and the SAIG currently only validates the index components if the memtable isn't present but still registers the components in either case.

We feel that the following changes could be made to improve this:

  1. The index build should only build the per-column indexes if fullRebuild=true, the index doesn't exist, or the index is invalid. In either case, the SSTables must be registered (on build completion or immediately).
  2. The SAIG should only register the SSTables if the memtable is present and shouldn't validate the components because this is the result of a memtable flush.

This will require some rework of the index builder, to make sure that the SSTables are always registered correctly and, as such, we would prefer that this was done on a separate ticket from this one. Which phase that ticket goes into is up for discussion. The downsides of the current implementation are that we can rebuild indexes that already have valid components and we can end up validating components twice.

WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mike-tr-adamson @pkolaczk I'm okay w/ doing this in another Jira. There's enough risk to make sure we focus on it specifically, and after this patch we'll only be doing something sub-optimally, not incorrectly. Now...in solution space...

The index build should only build the per-column indexes if fullRebuild=true, the index doesn't exist, or the index is invalid. In either case, the SSTables must be registered (on build completion or immediately).

I don't see any problem w/ this, although I'm not sure I understand the problem in the streaming context.

The invoked index build will rebuild the per-column index components for fullRebuild = false even if the components are present and valid. We do, though, only build the per-SSTable components if they are invalid, in this case.

Right after streaming, no index components for the new SSTables are present, right?

The SAIG should only register the SSTables if the memtable is present and shouldn't validate the components because this is the result of a memtable flush.

StorageAttachedIndexGroup registering and validating newly streamed entire index components is what makes this patch work, right? If we stopped doing that, who would register/checksum when the SIM-triggered build is a no-op?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

onSSTableChanged(Collections.emptySet(), notice.added, indexes, validate);
}
else if (notification instanceof SSTableListChangedNotification)
{
SSTableListChangedNotification notice = (SSTableListChangedNotification) notification;

// Avoid validation for index files just written during compaction.
onSSTableChanged(notice.removed, notice.added, indexes, false);
onSSTableChanged(notice.removed, notice.added, indexes, IndexValidation.NONE);
}
else if (notification instanceof MemtableRenewedNotification)
{
Expand Down Expand Up @@ -298,9 +298,9 @@ void dropIndexSSTables(Collection<SSTableReader> ss, StorageAttachedIndex index)
* files being corrupt or being unable to successfully update their views
*/
synchronized Set<StorageAttachedIndex> onSSTableChanged(Collection<SSTableReader> removed, Iterable<SSTableReader> added,
Set<StorageAttachedIndex> indexes, boolean validate)
Set<StorageAttachedIndex> indexes, IndexValidation validation)
{
Pair<Set<SSTableContext>, Set<SSTableReader>> results = contextManager.update(removed, added, validate);
Pair<Set<SSTableContext>, Set<SSTableReader>> results = contextManager.update(removed, added, validation);

if (!results.right.isEmpty())
{
Expand All @@ -321,7 +321,7 @@ synchronized Set<StorageAttachedIndex> onSSTableChanged(Collection<SSTableReader

for (StorageAttachedIndex index : indexes)
{
Collection<SSTableContext> invalid = index.getIndexContext().onSSTableChanged(removed, results.left, validate);
Collection<SSTableContext> invalid = index.getIndexContext().onSSTableChanged(removed, results.left, validation);

if (!invalid.isEmpty())
{
Expand Down Expand Up @@ -410,8 +410,8 @@ public SSTableContextManager sstableContextManager()
public void unsafeReload()
{
contextManager.clear();
onSSTableChanged(baseCfs.getLiveSSTables(), Collections.emptySet(), indexes, false);
onSSTableChanged(Collections.emptySet(), baseCfs.getLiveSSTables(), indexes, true);
onSSTableChanged(baseCfs.getLiveSSTables(), Collections.emptySet(), indexes, IndexValidation.NONE);
onSSTableChanged(Collections.emptySet(), baseCfs.getLiveSSTables(), indexes, IndexValidation.HEADER_FOOTER);
}

/**
Expand All @@ -422,6 +422,6 @@ public void reset()
{
contextManager.clear();
indexes.forEach(StorageAttachedIndex::makeIndexNonQueryable);
onSSTableChanged(baseCfs.getLiveSSTables(), Collections.emptySet(), indexes, false);
onSSTableChanged(baseCfs.getLiveSSTables(), Collections.emptySet(), indexes, IndexValidation.NONE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.cassandra.db.ClusteringComparator;
import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
import org.apache.cassandra.index.sai.IndexContext;
import org.apache.cassandra.index.sai.IndexValidation;
import org.apache.cassandra.index.sai.SSTableContext;
import org.apache.cassandra.index.sai.StorageAttachedIndex;
import org.apache.cassandra.index.sai.disk.PerColumnIndexWriter;
Expand Down Expand Up @@ -327,28 +328,25 @@ public long sizeOnDiskOfPerIndexComponent(IndexComponent indexComponent, IndexCo
}

@SuppressWarnings("BooleanMethodIsAlwaysInverted")
public boolean validatePerIndexComponents(IndexContext indexContext)
public boolean validatePerIndexComponents(IndexContext indexContext, IndexValidation validation)
{
logger.info(indexContext.logMessage("Validating per-column index components"));
return version.onDiskFormat().validatePerColumnIndexComponents(this, indexContext, false);
}
if (validation == IndexValidation.NONE)
return true;

@VisibleForTesting
public boolean validatePerIndexComponentsChecksum(IndexContext indexContext)
{
return version.onDiskFormat().validatePerColumnIndexComponents(this, indexContext, true);
logger.info(indexContext.logMessage("Validating per-column index components using mode " + validation));
boolean checksum = validation == IndexValidation.CHECKSUM;
return version.onDiskFormat().validatePerColumnIndexComponents(this, indexContext, checksum);
}

@SuppressWarnings("BooleanMethodIsAlwaysInverted")
public boolean validatePerSSTableComponents()
public boolean validatePerSSTableComponents(IndexValidation validation)
{
return version.onDiskFormat().validatePerSSTableIndexComponents(this, false);
}
if (validation == IndexValidation.NONE)
return true;

@SuppressWarnings("BooleanMethodIsAlwaysInverted")
public boolean validatePerSSTableComponentsChecksum()
{
return version.onDiskFormat().validatePerSSTableIndexComponents(this, true);
logger.info(logMessage("Validating per-sstable index components using mode " + validation));
boolean checksum = validation == IndexValidation.CHECKSUM;
return version.onDiskFormat().validatePerSSTableIndexComponents(this, checksum);
}

public void deletePerSSTableIndexComponents()
Expand Down
69 changes: 11 additions & 58 deletions src/java/org/apache/cassandra/index/sai/disk/io/IndexFileUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.cassandra.index.sai.disk.io;

import java.io.IOException;
import java.nio.ByteOrder;
import java.nio.ByteBuffer;
import java.util.zip.CRC32;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -58,7 +58,7 @@ public IndexOutputWriter openOutput(File file)
{
assert writerOption.finishOnClose() : "IndexOutputWriter relies on close() to sync with disk.";

return new IndexOutputWriter(new IncrementalChecksumSequentialWriter(file, writerOption));
return new IndexOutputWriter(new ChecksummingWriter(file, writerOption));
}

public IndexInput openInput(FileHandle handle)
Expand All @@ -75,74 +75,27 @@ public IndexInput openBlockingInput(File file)
return IndexInputReader.create(randomReader, fileHandle::close);
}

public interface ChecksumWriter
{
long getChecksum();
}

static class IncrementalChecksumSequentialWriter extends SequentialWriter implements ChecksumWriter
static class ChecksummingWriter extends SequentialWriter
{
private final CRC32 checksum = new CRC32();

IncrementalChecksumSequentialWriter(File file, SequentialWriterOption writerOption)
ChecksummingWriter(File file, SequentialWriterOption writerOption)
{
super(file, writerOption);
}

@Override
public void writeByte(int b) throws IOException
{
super.writeByte(b);
checksum.update(b);
}

@Override
public void write(byte[] b) throws IOException
{
super.write(b);
checksum.update(b);
}

@Override
public void write(byte[] b, int off, int len) throws IOException
{
super.write(b, off, len);
checksum.update(b, off, len);
}

@Override
public void writeChar(int v) throws IOException
{
super.writeChar(v);
addTochecksum(v, 2);
}

@Override
public void writeInt(int v) throws IOException
{
super.writeInt(v);
addTochecksum(v, 4);
}

@Override
public void writeLong(long v) throws IOException
{
super.writeLong(v);
addTochecksum(v, 8);
}

public long getChecksum()
public long getChecksum() throws IOException
{
flush();
return checksum.getValue();
}

private void addTochecksum(long bytes, int count)
@Override
protected void flushData()
{
int origCount = count;
if (ByteOrder.BIG_ENDIAN == buffer.order())
while (count > 0) checksum.update((int) (bytes >>> (8 * --count)));
else
while (count > 0) checksum.update((int) (bytes >>> (8 * (origCount - count--))));
ByteBuffer toAppend = buffer.duplicate().flip();
super.flushData();
checksum.update(toAppend);
}
}
}
Loading