Skip to content

Commit

Permalink
Make CQLSSTableWriter to support building of SAI indexes
Browse files Browse the repository at this point in the history
patch by Stefan Miklosovic; reviewed by Caleb Rackliffe, Doug Rohrer for CASSANDRA-18714
  • Loading branch information
smiklosovic committed Jan 22, 2024
1 parent 9f5e45e commit 016dd6c
Show file tree
Hide file tree
Showing 26 changed files with 917 additions and 320 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
5.0-beta2
* Make CQLSSTableWriter to support building of SAI indexes (CASSANDRA-18714)
* Append additional JVM options when using JDK17+ (CASSANDRA-19001)
* Upgrade Python driver to 3.29.0 (CASSANDRA-19245)
* Creating a SASI index after creating an SAI index does not break secondary index queries (CASSANDRA-18939)
Expand Down
18 changes: 17 additions & 1 deletion src/java/org/apache/cassandra/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.fql.FullQueryLoggerOptions;
import org.apache.cassandra.index.internal.CassandraIndex;
import org.apache.cassandra.io.compress.BufferType;
import org.apache.cassandra.io.sstable.format.big.BigFormat;
import org.apache.cassandra.service.StartupChecks.StartupCheckType;
import org.apache.cassandra.utils.StorageCompatibilityMode;
Expand Down Expand Up @@ -1164,7 +1165,22 @@ public enum MemtableAllocationType
unslabbed_heap_buffers_logged,
heap_buffers,
offheap_buffers,
offheap_objects
offheap_objects;

public BufferType toBufferType()
{
switch (this)
{
case unslabbed_heap_buffers:
case heap_buffers:
return BufferType.ON_HEAP;
case offheap_buffers:
case offheap_objects:
return BufferType.OFF_HEAP;
default:
throw new AssertionError();
}
}
}

public enum DiskFailurePolicy
Expand Down
89 changes: 60 additions & 29 deletions src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -200,22 +200,24 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner
are finished. By having flushExecutor size the same size as each of the perDiskflushExecutors we make sure we can
have that many flushes going at the same time.
*/
private static final ExecutorPlus flushExecutor = executorFactory()
.withJmxInternal()
.pooled("MemtableFlushWriter", getFlushWriters());
private static final ExecutorPlus flushExecutor = DatabaseDescriptor.isDaemonInitialized()
? executorFactory().withJmxInternal().pooled("MemtableFlushWriter", getFlushWriters())
: null;

// post-flush executor is single threaded to provide guarantee that any flush Future on a CF will never return until prior flushes have completed
private static final ExecutorPlus postFlushExecutor = executorFactory()
.withJmxInternal()
.sequential("MemtablePostFlush");
private static final ExecutorPlus postFlushExecutor = DatabaseDescriptor.isDaemonInitialized()
? executorFactory().withJmxInternal().sequential("MemtablePostFlush")
: null;

private static final ExecutorPlus reclaimExecutor = executorFactory()
.withJmxInternal()
.sequential("MemtableReclaimMemory");
private static final ExecutorPlus reclaimExecutor = DatabaseDescriptor.isDaemonInitialized()
? executorFactory().withJmxInternal().sequential("MemtableReclaimMemory")
: null;

private static final PerDiskFlushExecutors perDiskflushExecutors = new PerDiskFlushExecutors(DatabaseDescriptor.getFlushWriters(),
DatabaseDescriptor.getNonLocalSystemKeyspacesDataFileLocations(),
DatabaseDescriptor.useSpecificLocationForLocalSystemData());
private static final PerDiskFlushExecutors perDiskflushExecutors = DatabaseDescriptor.isDaemonInitialized()
? new PerDiskFlushExecutors(DatabaseDescriptor.getFlushWriters(),
DatabaseDescriptor.getNonLocalSystemKeyspacesDataFileLocations(),
DatabaseDescriptor.useSpecificLocationForLocalSystemData())
: null;

/**
* Reason for initiating a memtable flush.
Expand Down Expand Up @@ -394,7 +396,9 @@ public void reload()
indexManager.reload();

memtableFactory = metadata().params.memtable.factory();
switchMemtableOrNotify(FlushReason.SCHEMA_CHANGE, Memtable::metadataUpdated);

if (DatabaseDescriptor.isDaemonInitialized())
switchMemtableOrNotify(FlushReason.SCHEMA_CHANGE, Memtable::metadataUpdated);
}

public static Runnable getBackgroundCompactionTaskSubmitter()
Expand Down Expand Up @@ -855,26 +859,53 @@ public void loadNewSSTables()
sstableImporter.importNewSSTables(options);
}

/**
* #{@inheritDoc}
*/
public synchronized List<String> importNewSSTables(Set<String> srcPaths, boolean resetLevel, boolean clearRepaired, boolean verifySSTables, boolean verifyTokens, boolean invalidateCaches, boolean extendedVerify, boolean copyData)
@Override
public List<String> importNewSSTables(Set<String> srcPaths, boolean resetLevel, boolean clearRepaired,
boolean verifySSTables, boolean verifyTokens, boolean invalidateCaches,
boolean extendedVerify, boolean copyData)
{
SSTableImporter.Options options = SSTableImporter.Options.options(srcPaths)
.resetLevel(resetLevel)
.clearRepaired(clearRepaired)
.verifySSTables(verifySSTables)
.verifyTokens(verifyTokens)
.invalidateCaches(invalidateCaches)
.extendedVerify(extendedVerify)
.copyData(copyData).build();

return sstableImporter.importNewSSTables(options);
return sstableImporter.importNewSSTables(SSTableImporter.Options.options(srcPaths)
.resetLevel(resetLevel)
.clearRepaired(clearRepaired)
.verifySSTables(verifySSTables)
.verifyTokens(verifyTokens)
.invalidateCaches(invalidateCaches)
.extendedVerify(extendedVerify)
.copyData(copyData).build());
}

public List<String> importNewSSTables(Set<String> srcPaths, boolean resetLevel, boolean clearRepaired, boolean verifySSTables, boolean verifyTokens, boolean invalidateCaches, boolean extendedVerify)
@Override
public List<String> importNewSSTables(Set<String> srcPaths, boolean resetLevel, boolean clearRepaired,
boolean verifySSTables, boolean verifyTokens, boolean invalidateCaches,
boolean extendedVerify)
{
return importNewSSTables(srcPaths, resetLevel, clearRepaired, verifySSTables, verifyTokens, invalidateCaches, extendedVerify, false);
return sstableImporter.importNewSSTables(SSTableImporter.Options.options(srcPaths)
.resetLevel(resetLevel)
.clearRepaired(clearRepaired)
.verifySSTables(verifySSTables)
.verifyTokens(verifyTokens)
.invalidateCaches(invalidateCaches)
.extendedVerify(extendedVerify)
.build());
}

@Override
public List<String> importNewSSTables(Set<String> srcPaths, boolean resetLevel, boolean clearRepaired,
boolean verifySSTables, boolean verifyTokens, boolean invalidateCaches,
boolean extendedVerify, boolean copyData, boolean failOnMissingIndex,
boolean validateIndexChecksum)
{
return sstableImporter.importNewSSTables(SSTableImporter.Options.options(srcPaths)
.resetLevel(resetLevel)
.clearRepaired(clearRepaired)
.verifySSTables(verifySSTables)
.verifyTokens(verifyTokens)
.invalidateCaches(invalidateCaches)
.extendedVerify(extendedVerify)
.failOnMissingIndex(failOnMissingIndex)
.validateIndexChecksum(validateIndexChecksum)
.copyData(copyData)
.build());
}

Descriptor getUniqueDescriptorFor(Descriptor descriptor, File targetDirectory)
Expand Down
41 changes: 35 additions & 6 deletions src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
Original file line number Diff line number Diff line change
Expand Up @@ -189,12 +189,12 @@ public interface ColumnFamilyStoreMBean
/** @deprecated See CASSANDRA-16407 */
@Deprecated(since = "4.0")
public List<String> importNewSSTables(Set<String> srcPaths,
boolean resetLevel,
boolean clearRepaired,
boolean verifySSTables,
boolean verifyTokens,
boolean invalidateCaches,
boolean extendedVerify);
boolean resetLevel,
boolean clearRepaired,
boolean verifySSTables,
boolean verifyTokens,
boolean invalidateCaches,
boolean extendedVerify);

/**
* Load new sstables from the given directory
Expand All @@ -210,6 +210,8 @@ public List<String> importNewSSTables(Set<String> srcPaths,
*
* @return list of failed import directories
*/
/** @deprecated See CASSANDRA-18714 */
@Deprecated(since = "5.0")
public List<String> importNewSSTables(Set<String> srcPaths,
boolean resetLevel,
boolean clearRepaired,
Expand All @@ -219,6 +221,33 @@ public List<String> importNewSSTables(Set<String> srcPaths,
boolean extendedVerify,
boolean copyData);

/**
* Load new sstables from the given directory
*
* @param srcPaths the path to the new sstables - if it is an empty set, the data directories will be scanned
* @param resetLevel if the level should be reset to 0 on the new sstables
* @param clearRepaired if repaired info should be wiped from the new sstables
* @param verifySSTables if the new sstables should be verified that they are not corrupt
* @param verifyTokens if the tokens in the new sstables should be verified that they are owned by the current node
* @param invalidateCaches if row cache should be invalidated for the keys in the new sstables
* @param extendedVerify if we should run an extended verify checking all values in the new sstables
* @param copyData if we should copy data from source paths instead of moving them
* @param validateIndexChecksum if we should also validate checksum for SAI indexes
* @param failOnMissingIndex if loading should fail when SSTables do not contain built SAI indexes too
*
* @return list of failed import directories
*/
public List<String> importNewSSTables(Set<String> srcPaths,
boolean resetLevel,
boolean clearRepaired,
boolean verifySSTables,
boolean verifyTokens,
boolean invalidateCaches,
boolean extendedVerify,
boolean copyData,
boolean failOnMissingIndex,
boolean validateIndexChecksum);

/** @deprecated See CASSANDRA-6719 */
@Deprecated(since = "4.0")
public void loadNewSSTables();
Expand Down

0 comments on commit 016dd6c

Please sign in to comment.