Skip to content

Commit

Permalink
fixes following Mike's review
Browse files Browse the repository at this point in the history
  • Loading branch information
maedhroz committed Jul 20, 2023
1 parent 0cca1c3 commit 9867c66
Show file tree
Hide file tree
Showing 8 changed files with 18 additions and 9 deletions.
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/db/SSTableImporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ synchronized List<String> importNewSSTables(Options options)

// Validate existing SSTable-attached indexes, and then build any that are missing:
if (!cfs.indexManager.validateSSTableAttachedIndexes(newSSTables, false))
cfs.indexManager.buildIndexesBlockingIncremental(newSSTables);
cfs.indexManager.buildSSTableAttachedIndexesBlocking(newSSTables);

cfs.getTracker().addSSTables(newSSTables);
for (SSTableReader reader : newSSTables)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,11 +249,12 @@ public void finished()
// don't stream the entire SSTable, validation is unnecessary, as the indexes have just been written
// via the SSTable flush observer, and an error there would have aborted the streaming transaction.
if (receivedEntireSSTable)
// If we do validate, any exception thrown doing so will also abort the streaming transaction:
cfs.indexManager.validateSSTableAttachedIndexes(readers, true);

finishTransaction();

// add sstables (this will build secondary indexes too, see CASSANDRA-10130)
// add sstables (this will build non-SSTable-attached secondary indexes too, see CASSANDRA-10130)
logger.debug("[Stream #{}] Received {} sstables from {} ({})", session.planId(), readers.size(), session.peer, readers);
cfs.addSSTables(readers);

Expand Down
4 changes: 3 additions & 1 deletion src/java/org/apache/cassandra/index/Index.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/
package org.apache.cassandra.index;

import java.io.UncheckedIOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
Expand Down Expand Up @@ -789,7 +790,8 @@ default void invalidate() { }
* @return true if all indexes in the group are complete and valid
* false if any index is incomplete and {@code throwOnIncomplete} is false
*
* @throws IllegalStateException if {@code throwOnIncomplete} is true and any index in the group is incomplete
* @throws IllegalStateException if {@code throwOnIncomplete} is true and any index in the group is incomplete
* @throws UncheckedIOException if there is a problem validating any on-disk component of an index in the group
*/
default boolean validateSSTableAttachedIndexes(Collection<SSTableReader> sstables, boolean throwOnIncomplete)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.cassandra.index;

import java.io.UncheckedIOException;
import java.lang.reflect.Constructor;
import java.util.*;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -503,7 +504,8 @@ public static String getIndexName(String cfName)
* @return true if all indexes in all groups are complete and valid
* false if an index in any group is incomplete and {@code throwOnIncomplete} is false
*
* @throws IllegalStateException if {@code throwOnIncomplete} is true and an index in any group is incomplete
* @throws IllegalStateException if {@code throwOnIncomplete} is true and an index in any group is incomplete
* @throws UncheckedIOException if there is a problem validating any on-disk component in any group
*/
public boolean validateSSTableAttachedIndexes(Collection<SSTableReader> sstables, boolean throwOnIncomplete)
{
Expand All @@ -530,7 +532,7 @@ public boolean validateSSTableAttachedIndexes(Collection<SSTableReader> sstables
*
* @param sstables the SSTables for which indexes must be built
*/
public void buildIndexesBlockingIncremental(Collection<SSTableReader> sstables)
public void buildSSTableAttachedIndexesBlocking(Collection<SSTableReader> sstables)
{
Set<Index> toBuild = indexes.values().stream().filter(Index::isSSTableAttached).collect(Collectors.toSet());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ private StorageAttachedIndexWriter(IndexDescriptor indexDescriptor,
boolean perIndexComponentsOnly) throws IOException
{
this.indexDescriptor = indexDescriptor;
//this.indexes = indexes;
this.rowMapping = RowMapping.create(lifecycleNewTracker.opType());
this.perIndexWriters = indexes.stream().map(index -> indexDescriptor.newPerColumnIndexWriter(index,
lifecycleNewTracker,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.cassandra.index.sai.disk.format;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Set;

import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
Expand Down Expand Up @@ -115,6 +116,8 @@ PerColumnIndexWriter newPerColumnIndexWriter(StorageAttachedIndex index,
*
* @param indexDescriptor The {@link IndexDescriptor} for the SSTable SAI index
* @param checksum {@code true} if the checksum should be tested as part of the validation
*
* @throws UncheckedIOException if there is a problem validating any on-disk component
*/
void validatePerSSTableIndexComponents(IndexDescriptor indexDescriptor, boolean checksum);

Expand All @@ -124,9 +127,11 @@ PerColumnIndexWriter newPerColumnIndexWriter(StorageAttachedIndex index,
* @param indexDescriptor The {@link IndexDescriptor} for the SSTable SAI index
* @param indexContext The {@link IndexContext} holding the per-index information for the index
* @param checksum {@code true} if the checksum should be tested as part of the validation
*
* @throws UncheckedIOException if there is a problem validating any on-disk component
*/
void validatePerColumnIndexComponents(IndexDescriptor indexDescriptor, IndexContext indexContext, boolean checksum);

/**
* Returns the set of {@link IndexComponent} for the per-SSTable part of an index.
* This is a complete set of components that could exist on-disk. It does not imply that the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,4 +280,4 @@ public static void validateChecksum(IndexInput input, @SuperCall Callable<Void>
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -197,4 +197,4 @@ public static void validateChecksum(IndexInput input, @SuperCall Callable<Void>
}
}
}
}
}

0 comments on commit 9867c66

Please sign in to comment.