diff --git a/src/java/org/apache/cassandra/index/sai/IndexContext.java b/src/java/org/apache/cassandra/index/sai/IndexContext.java index e699fc6da5a9..7820400a6c25 100644 --- a/src/java/org/apache/cassandra/index/sai/IndexContext.java +++ b/src/java/org/apache/cassandra/index/sai/IndexContext.java @@ -83,6 +83,7 @@ import org.apache.cassandra.index.sai.utils.TypeUtil; import org.apache.cassandra.index.sai.view.IndexViewManager; import org.apache.cassandra.index.sai.view.View; +import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.schema.ColumnMetadata; @@ -152,6 +153,8 @@ public class IndexContext private final int maxTermSize; + private volatile boolean dropped = false; + public IndexContext(@Nonnull String keyspace, @Nonnull String table, @Nonnull TableId tableId, @@ -173,7 +176,6 @@ public IndexContext(@Nonnull String keyspace, this.viewManager = new IndexViewManager(this); this.validator = TypeUtil.cellValueType(column, indexType); this.cfs = cfs; - this.primaryKeyFactory = Version.latest().onDiskFormat().newPrimaryKeyFactory(clusteringComparator); if (config != null) @@ -564,9 +566,12 @@ public long estimatedOffHeapMemIndexMemoryUsed() /** * @return A set of SSTables which have attached to them invalid index components. */ - public Set onSSTableChanged(Collection oldSSTables, Collection newSSTables, boolean validate) + public Set onSSTableChanged(Collection oldSSTables, + Collection newSSTables, + Collection newContexts, + boolean validate) { - return viewManager.update(oldSSTables, newSSTables, validate); + return viewManager.update(oldSSTables, newSSTables, newContexts, validate); } public ColumnMetadata getDefinition() @@ -656,7 +661,12 @@ public void prepareSSTablesForRebuild(Collection sstablesToRebuil public boolean isIndexed() { - return config != null; + return config != null && !dropped; + } + + public boolean isDropped() + { + return dropped; } /** @@ -675,6 +685,7 @@ public boolean isAnalyzed() */ public void invalidate(boolean obsolete) { + dropped = true; liveMemtables.clear(); viewManager.invalidate(obsolete); indexMetrics.release(); @@ -692,6 +703,16 @@ public ConcurrentMap getLiveMemtables() return liveMemtables; } + public @Nullable MemtableIndex getMemtableIndex(Memtable memtable) + { + return liveMemtables.get(memtable); + } + + public @Nullable SSTableIndex getSSTableIndex(Descriptor descriptor) + { + return getView().getSSTableIndex(descriptor); + } + public boolean supports(Operator op) { if (op.isLike() || op == Operator.LIKE) return false; diff --git a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java index fd2ab5795dce..a39bf2e544b4 100644 --- a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java +++ b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java @@ -801,7 +801,7 @@ private synchronized List findNonIndexedSSTables(ColumnFamilyStor // 1. The current view does not contain the SSTable // 2. The SSTable is not marked compacted // 3. The column index does not have a completion marker - if (!view.containsSSTable(sstable) + if (!view.hasIndexForSSTable(sstable) && !sstable.isMarkedCompacted() && !IndexDescriptor.isIndexBuildCompleteOnDisk(sstable, indexContext)) { diff --git a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java index 4e9d82c3389b..d6cf0af7a78f 100644 --- a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java +++ b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java @@ -34,6 +34,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -89,6 +90,8 @@ public class StorageAttachedIndexGroup implements Index.Group, INotificationCons private final SSTableContextManager contextManager; + + StorageAttachedIndexGroup(ColumnFamilyStore baseCfs) { this.baseCfs = baseCfs; @@ -302,7 +305,7 @@ public void handleNotification(INotification notification, Object sender) // Avoid validation for index files just written following Memtable flush. ZCS streaming should // validate index checksum. boolean validate = notice.fromStreaming() || !notice.memtable().isPresent(); - onSSTableChanged(Collections.emptySet(), notice.added, indices, validate); + onSSTableChanged(Collections.emptySet(), Lists.newArrayList(notice.added), indices, validate); } else if (notification instanceof SSTableListChangedNotification) { @@ -342,7 +345,7 @@ void prepareIndexSSTablesForRebuild(Collection ss, StorageAttache * @return the set of column indexes that were marked as non-queryable as a result of their per-SSTable index * files being corrupt or being unable to successfully update their views */ - public synchronized Set onSSTableChanged(Collection removed, Iterable added, + public synchronized Set onSSTableChanged(Collection removed, Collection added, Set indexes, boolean validate) { Optional> optValid = contextManager.update(removed, added, validate, indices); @@ -357,7 +360,7 @@ public synchronized Set onSSTableChanged(Collection invalid = index.getIndexContext().onSSTableChanged(removed, optValid.get(), validate); + Set invalid = index.getIndexContext().onSSTableChanged(removed, added, optValid.get(), validate); if (!invalid.isEmpty()) { diff --git a/src/java/org/apache/cassandra/index/sai/plan/QueryController.java b/src/java/org/apache/cassandra/index/sai/plan/QueryController.java index 20191fb8d830..30920970be0d 100644 --- a/src/java/org/apache/cassandra/index/sai/plan/QueryController.java +++ b/src/java/org/apache/cassandra/index/sai/plan/QueryController.java @@ -442,7 +442,7 @@ private KeyRangeIterator buildIterator(Expression predicate) * Invocations are memorized - multiple calls for the same context return the same view. * The views are kept for the lifetime of this {@code QueryController}. */ - QueryView getQueryView(IndexContext context) + QueryView getQueryView(IndexContext context) throws QueryView.Builder.MissingIndexException { return queryViews.computeIfAbsent(context, c -> new QueryView.Builder(c, mergeRange, queryContext).build()); @@ -586,6 +586,13 @@ public CloseableIterator getTopKRows(Expression predicate sstableResults.addAll(memtableResults); return MergeIterator.getNonReducingCloseable(sstableResults, orderer.getComparator()); } + catch (QueryView.Builder.MissingIndexException e) + { + if (orderer.context.isDropped()) + throw invalidRequest(TopKProcessor.INDEX_MAY_HAVE_BEEN_DROPPED); + else + throw new IllegalStateException("Index not found but hasn't been dropped", e); + } catch (Throwable t) { FileUtils.closeQuietly(memtableResults); @@ -648,15 +655,16 @@ private List materializeKeys(KeyRangeIterator source) private CloseableIterator getTopKRows(List sourceKeys, int softLimit) { Tracing.logAndTrace(logger, "SAI predicates produced {} keys", sourceKeys.size()); - QueryView view = getQueryView(orderer.context); - var memtableResults = view.memtableIndexes.stream() - .map(index -> index.orderResultsBy(queryContext, - sourceKeys, - orderer, - softLimit)) - .collect(Collectors.toList()); + List> memtableResults = null; try { + QueryView view = getQueryView(orderer.context); + memtableResults = view.memtableIndexes.stream() + .map(index -> index.orderResultsBy(queryContext, + sourceKeys, + orderer, + softLimit)) + .collect(Collectors.toList()); var totalRows = view.getTotalSStableRows(); SSTableSearcher ssTableSearcher = index -> index.orderResultsBy(queryContext, sourceKeys, @@ -667,9 +675,17 @@ private CloseableIterator getTopKRows(List so sstableScoredPrimaryKeyIterators.addAll(memtableResults); return MergeIterator.getNonReducingCloseable(sstableScoredPrimaryKeyIterators, orderer.getComparator()); } + catch (QueryView.Builder.MissingIndexException e) + { + if (orderer.context.isDropped()) + throw invalidRequest(TopKProcessor.INDEX_MAY_HAVE_BEEN_DROPPED); + else + throw new IllegalStateException("Index not found but hasn't been dropped", e); + } catch (Throwable t) { - FileUtils.closeQuietly(memtableResults); + if (memtableResults != null) + FileUtils.closeQuietly(memtableResults); throw t; } diff --git a/src/java/org/apache/cassandra/index/sai/plan/QueryView.java b/src/java/org/apache/cassandra/index/sai/plan/QueryView.java index 45f2481b99ca..b11f5acf8671 100644 --- a/src/java/org/apache/cassandra/index/sai/plan/QueryView.java +++ b/src/java/org/apache/cassandra/index/sai/plan/QueryView.java @@ -18,6 +18,7 @@ package org.apache.cassandra.index.sai.plan; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Set; @@ -27,12 +28,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.googlecode.concurrenttrees.common.Iterables; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.db.lifecycle.SSTableSet; import org.apache.cassandra.db.lifecycle.View; +import org.apache.cassandra.db.memtable.Memtable; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.Bounds; import org.apache.cassandra.index.sai.IndexContext; @@ -40,11 +41,13 @@ import org.apache.cassandra.index.sai.SSTableIndex; import org.apache.cassandra.index.sai.memory.MemtableIndex; import org.apache.cassandra.index.sai.utils.RangeUtil; +import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.MonotonicClock; import org.apache.cassandra.utils.NoSpamLogger; + public class QueryView implements AutoCloseable { final ColumnFamilyStore.RefViewFragment view; @@ -101,80 +104,158 @@ static class Builder this.queryContext = queryContext; } + /** + * Denotes a situation when there exist no index for an active memtable or sstable. + * This can happen e.g. when the index gets dropped while running the query. + */ + static class MissingIndexException extends RuntimeException + { + final IndexContext context; + + public MissingIndexException(IndexContext context, String message) + { + super(message); + this.context = context; + } + } + /** * Acquire references to all the memtables, memtable indexes, sstables, and sstable indexes required for the * given expression. - *

- * Will retry if the active sstables change concurrently. */ - protected QueryView build() + protected QueryView build() throws MissingIndexException { var referencedIndexes = new HashSet(); - long failingSince = -1L; + ColumnFamilyStore.RefViewFragment refViewFragment = null; + + // We must use the canonical view in order for the equality check for source sstable/memtable + // to work correctly. + var filter = RangeUtil.coversFullRing(range) + ? View.selectFunction(SSTableSet.CANONICAL) + : View.select(SSTableSet.CANONICAL, s -> RangeUtil.intersects(s, range)); + + try { + // Keeps track of which memtables we've already tried to match the index to. + // If we fail to match the index to the memtable for the first time, we have to retry + // because the memtable could be flushed and its index removed between the moment we + // got the view and the moment we did the lookup. + // If we get the same memtable in the view again, and there is no index, + // then the missing index is not due to a concurrent modification, but it doesn't contain indexed + // data, so we can ignore it. + var processedMemtables = new HashSet(); + + + var start = MonotonicClock.approxTime.now(); + Memtable unmatchedMemtable = null; + Descriptor unmatchedSStable = null; + + // This loop will spin only if there is a mismatch between the view managed by IndexViewManager + // and the view managed by Cassandra Tracker. Such a mismatch can happen at the moment when + // the sstable or memtable sets are updated, e.g. on flushes or compactions. The mismatch + // should last only until all Tracker notifications get processed by SAI + // (which doesn't involve I/O and should be very fast). We expect the mismatch to resolve in order + // of nanoceconds, but the timeout is large enough just in case of unpredictable performance hiccups. outer: - while (true) + while (!MonotonicClock.approxTime.isAfter(start + TimeUnit.MILLISECONDS.toNanos(2000))) { - // Prevent an infinite loop + // cleanup after the previous iteration if we're retrying + release(referencedIndexes); + release(refViewFragment); + + // Prevent exceeding the query timeout queryContext.checkpoint(); - // Acquire live memtable index and memtable references first to avoid missing an sstable due to flush. - // Copy the memtable indexes to avoid concurrent modification. - var memtableIndexes = new HashSet<>(indexContext.getLiveMemtables().values()); - - // We must use the canonical view in order for the equality check for source sstable/memtable - // to work correctly. - var filter = RangeUtil.coversFullRing(range) - ? View.selectFunction(SSTableSet.CANONICAL) - : View.select(SSTableSet.CANONICAL, s -> RangeUtil.intersects(s, range)); - var refViewFragment = cfs.selectAndReference(filter); - var memtables = Iterables.toSet(refViewFragment.memtables); - // Confirm that all the memtables associated with the memtable indexes we already have are still live. - // There might be additional memtables that are not associated with the expression because tombstones - // are not indexed. - for (MemtableIndex memtableIndex : memtableIndexes) + // Lock a consistent view of memtables and sstables. + // A consistent view is required for correctness of order by and vector queries. + refViewFragment = cfs.selectAndReference(filter); + var indexView = indexContext.getView(); + + // Lookup the indexes corresponding to memtables: + var memtableIndexes = new HashSet(); + for (Memtable memtable : refViewFragment.memtables) { - if (!memtables.contains(memtableIndex.getMemtable())) + // Empty memtables have no index but that's not a problem, we can ignore them. + if (memtable.getLiveDataSize() == 0) + continue; + + MemtableIndex index = indexContext.getMemtableIndex(memtable); + if (index != null) + { + memtableIndexes.add(index); + } + else if (indexContext.isDropped()) { + // Index was dropped deliberately by the user. + // We cannot recover here. refViewFragment.release(); + throw new MissingIndexException(indexContext, "Index " + indexContext.getIndexName() + + " not found for memtable: " + memtable); + } + else if (!processedMemtables.contains(memtable)) + { + // We can end up here if a flush happened right after we referenced the refViewFragment + // but before looking up the memtable index. + // In that case, we need to retry with the updated view + // (we expect the updated view to not contain this memtable). + + // Remember this metable to protect from infinite looping in case we have a permanent + // inconsistency between the index set and the memtable set. + processedMemtables.add(memtable); + + unmatchedMemtable = memtable; continue outer; } + // If the memtable was non-empty, the index context hasn't been dropped, but the + // index doesn't exist on the second attempt, then his means there is no indexed data + // in that memtable. In this case we just continue without it. + // Memtable indexes are created lazily, on the first insert, therefore a missing index + // is a normal situation. } - Set indexes = getIndexesForExpression(indexContext); - // Attempt to reference each of the indexes, and thn confirm that the sstable associated with the index - // is in the refViewFragment. If it isn't in the refViewFragment, we will get incorrect results, so - // we release the indexes and refViewFragment and try again. - for (SSTableIndex index : indexes) + // Lookup and reference the indexes corresponding to the sstables: + for (SSTableReader sstable : refViewFragment.sstables) { - var success = index.reference(); - if (success) - referencedIndexes.add(index); + // If the IndexViewManager never saw this sstable, then we need to spin. + // Let's hope in the next iteration we get the indexView based on the same sstable set + // as the refViewFragment. + if (!indexView.containsSSTable(sstable)) + { + if (MonotonicClock.approxTime.isAfter(start + 100)) + NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.SECONDS, + "Spinning trying to get the index for sstable {} because index view is out of sync", sstable.descriptor); + + unmatchedSStable = sstable.descriptor; + continue outer; + } - if (!success || !refViewFragment.sstables.contains(index.getSSTable())) + SSTableIndex index = indexView.getSSTableIndex(sstable.descriptor); + + // The IndexViewManager got the update about this sstable, but there is no index for the sstable + // (e.g. index was dropped or got corrupt, etc.). In this case retrying won't fix it. + if (index == null) + throw new MissingIndexException(indexContext, "Index " + indexContext.getIndexName() + + " not found for sstable: " + sstable.descriptor); + + if (!indexInRange(index)) + continue; + + // It is unlikely but possible the index got unreferenced just between the moment we grabbed the + // refViewFragment and getting here. In that case we won't be able to reference it and we have + // to retry. + if (!index.reference()) { - referencedIndexes.forEach(SSTableIndex::release); - referencedIndexes.clear(); - refViewFragment.release(); - // Log about the failures - if (failingSince <= 0) - { - failingSince = MonotonicClock.approxTime.now(); - } - else if (MonotonicClock.approxTime.now() - failingSince > TimeUnit.MILLISECONDS.toNanos(100)) - { - failingSince = MonotonicClock.approxTime.now(); - if (success) - NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.SECONDS, - "Spinning trying to capture index reader for {}, but it was released.", index); - else - NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.SECONDS, - "Spinning trying to capture readers for {}, but : {}, ", refViewFragment.sstables, index.getSSTable()); - } + if (MonotonicClock.approxTime.isAfter(start + 100)) + NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.SECONDS, + "Spinning trying to get the index for sstable {} because index was released", sstable.descriptor); + + unmatchedSStable = sstable.descriptor; continue outer; } + + referencedIndexes.add(index); } // freeze referencedIndexes and memtableIndexes, so we can safely give access to them @@ -185,6 +266,24 @@ else if (MonotonicClock.approxTime.now() - failingSince > TimeUnit.MILLISECONDS. Collections.unmodifiableSet(memtableIndexes), indexContext); } + + + if (unmatchedMemtable != null) + throw new MissingIndexException(indexContext, "Index " + indexContext.getIndexName() + + " not found for memtable " + unmatchedMemtable); + if (unmatchedSStable != null) + throw new MissingIndexException(indexContext, "Index " + indexContext.getIndexName() + + " not found for sstable " + unmatchedSStable); + + // This should be unreachable, because whenever we retry, we always set unmatchedMemtable + // or unmatchedSSTable, so we'd log a better message above. + throw new MissingIndexException(indexContext, "Failed to build QueryView for index " + indexContext.getIndexName()); + } + catch (MissingIndexException e) + { + release(referencedIndexes); + release(refViewFragment); + throw e; } finally { @@ -200,16 +299,17 @@ else if (MonotonicClock.approxTime.now() - failingSince > TimeUnit.MILLISECONDS. } } - /** - * Get the index - */ - private Set getIndexesForExpression(IndexContext indexContext) + private void release(ColumnFamilyStore.RefViewFragment refViewFragment) { - if (!indexContext.isIndexed()) - throw new IllegalArgumentException("Expression is not indexed"); + if (refViewFragment != null) + refViewFragment.release(); + } - // Get all the indexes in the range. - return indexContext.getView().getIndexes().stream().filter(this::indexInRange).collect(Collectors.toSet()); + private void release(Collection indexes) + { + for (var index : indexes) + index.release(); + indexes.clear(); } // I've removed the concept of "most selective index" since we don't actually have per-sstable @@ -227,5 +327,4 @@ private boolean indexInRange(SSTableIndex index) return range.left.compareTo(sstable.last) <= 0 && (range.right.isMinimum() || sstable.first.compareTo(range.right) <= 0); } } - } diff --git a/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java b/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java index cee2d69978fd..95fd9b820393 100644 --- a/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java +++ b/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java @@ -67,6 +67,8 @@ import org.apache.cassandra.utils.CloseableIterator; import org.apache.cassandra.utils.FBUtilities; +import static org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest; + public class StorageAttachedIndexSearcher implements Index.Searcher { private static final Logger logger = LoggerFactory.getLogger(StorageAttachedIndexSearcher.class); @@ -97,33 +99,55 @@ public ReadCommand command() @SuppressWarnings("unchecked") public UnfilteredPartitionIterator search(ReadExecutionController executionController) throws RequestTimeoutException { - try + int retries = 0; + while (true) { - FilterTree filterTree = analyzeFilter(); - Plan plan = controller.buildPlan(); - Iterator keysIterator = controller.buildIterator(plan); + try + { + FilterTree filterTree = analyzeFilter(); + Plan plan = controller.buildPlan(); + Iterator keysIterator = controller.buildIterator(plan); - // Can't check for `command.isTopK()` because the planner could optimize sorting out - Orderer ordering = plan.ordering(); - if (ordering != null) + // Can't check for `command.isTopK()` because the planner could optimize sorting out + Orderer ordering = plan.ordering(); + if (ordering != null) + { + assert !(keysIterator instanceof KeyRangeIterator); + var scoredKeysIterator = (CloseableIterator) keysIterator; + var result = new ScoreOrderedResultRetriever(scoredKeysIterator, filterTree, controller, + executionController, queryContext, command.limits().count()); + return (UnfilteredPartitionIterator) new TopKProcessor(command).filter(result); + } + else + { + assert keysIterator instanceof KeyRangeIterator; + return new ResultRetriever((KeyRangeIterator) keysIterator, filterTree, controller, executionController, queryContext); + } + } + catch (QueryView.Builder.MissingIndexException e) { - assert !(keysIterator instanceof KeyRangeIterator); - var scoredKeysIterator = (CloseableIterator) keysIterator; - var result = new ScoreOrderedResultRetriever(scoredKeysIterator, filterTree, controller, - executionController, queryContext, command.limits().count()); - return (UnfilteredPartitionIterator) new TopKProcessor(command).filter(result); + // If an index was dropped while we were preparing the plan or between preparing the plan + // and creating the result retriever, we can retry without that index, + // because there may be other indexes that could be used to run the query. + // And if there are no good indexes left, we'd get a good contextual request error message. + if (e.context.isDropped() && retries < 8) + { + logger.debug("Index " + e.context.getIndexName() + " dropped while preparing the query plan. Retrying."); + retries++; + continue; + } + + // If we end up here, this is either a bug or a problem with an index (corrupted / missing components?). + controller.abort(); + logger.error("Index not found", e); + throw invalidRequest("Index missing or corrupt: " + e.context.getIndexName()); } - else + catch (Throwable t) { - assert keysIterator instanceof KeyRangeIterator; - return new ResultRetriever((KeyRangeIterator) keysIterator, filterTree, controller, executionController, queryContext); + controller.abort(); + throw t; } } - catch (Throwable t) - { - controller.abort(); - throw t; - } } diff --git a/src/java/org/apache/cassandra/index/sai/view/IndexViewManager.java b/src/java/org/apache/cassandra/index/sai/view/IndexViewManager.java index d7fb96f6e118..3546bf7cbae0 100644 --- a/src/java/org/apache/cassandra/index/sai/view/IndexViewManager.java +++ b/src/java/org/apache/cassandra/index/sai/view/IndexViewManager.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; @@ -52,16 +53,17 @@ public class IndexViewManager private final IndexContext context; private final AtomicReference view = new AtomicReference<>(); + public IndexViewManager(IndexContext context) { - this(context, Collections.emptySet()); + this(context, Collections.emptySet(), Collections.emptySet()); } @VisibleForTesting - IndexViewManager(IndexContext context, Collection indices) + IndexViewManager(IndexContext context, Collection sstables, Collection indices) { this.context = context; - this.view.set(new View(context, indices)); + this.view.set(new View(context, sstables, indices)); } public View getView() @@ -73,12 +75,16 @@ public View getView() * Replaces old SSTables with new by creating new immutable view. * * @param oldSSTables A set of SSTables to remove. + * @param newSSTables A set of SSTables added in Cassandra. * @param newSSTableContexts A set of SSTableContexts to add to tracker. * @param validate if true, per-column index files' header and footer will be validated. * * @return A set of SSTables which have attached to them invalid index components. */ - public Set update(Collection oldSSTables, Collection newSSTableContexts, boolean validate) + public Set update(Collection oldSSTables, + Collection newSSTables, + Collection newSSTableContexts, + boolean validate) { // Valid indexes on the left and invalid SSTable contexts on the right... Pair, Set> indexes = context.getBuiltIndexes(newSSTableContexts, validate); @@ -87,13 +93,19 @@ public Set update(Collection oldSSTables, Collect Map newViewIndexes = new HashMap<>(); Collection releasableIndexes = new ArrayList<>(); Collection toRemove = new HashSet<>(oldSSTables); - + do { currentView = view.get(); newViewIndexes.clear(); releasableIndexes.clear(); + Set sstables = new HashSet<>(currentView.getSSTables()); + for (SSTableReader sstable : oldSSTables) + sstables.remove(sstable.descriptor); + for (SSTableReader sstable : newSSTables) + sstables.add(sstable.descriptor); + for (SSTableIndex sstableIndex : currentView) { // When aborting early open transaction, toRemove may have the same sstable files as newSSTableContexts, @@ -111,7 +123,7 @@ public Set update(Collection oldSSTables, Collect addOrUpdateSSTableIndex(sstableIndex, newViewIndexes, releasableIndexes); } - newView = new View(context, newViewIndexes.values()); + newView = new View(context, sstables, newViewIndexes.values()); } while (!view.compareAndSet(currentView, newView)); @@ -145,19 +157,24 @@ private static void addOrUpdateSSTableIndex(SSTableIndex ssTableIndex, Map sstablesToRebuild) { - View currentView = view.get(); - Set toRemove = new HashSet<>(sstablesToRebuild); - for (SSTableIndex index : currentView) + View oldView, newView; + Set indexesToRemove; + do { - SSTableReader sstable = index.getSSTable(); - if (!toRemove.contains(sstable)) - continue; - - index.release(); + oldView = view.get(); + indexesToRemove = oldView.getIndexes() + .stream() + .filter(index -> toRemove.contains(index.getSSTable())) + .collect(Collectors.toSet()); + var newIndexes = new HashSet<>(oldView.getIndexes()); + newIndexes.removeAll(indexesToRemove); + newView = new View(context, oldView.getSSTables(), newIndexes); } + while (!view.compareAndSet(oldView, newView)); - update(toRemove, Collections.emptyList(), false); + for (SSTableIndex index : indexesToRemove) + index.release(); } /** @@ -169,9 +186,15 @@ public void prepareSSTablesForRebuild(Collection sstablesToRebuil */ public void invalidate(boolean indexWasDropped) { - View previousView = view.getAndSet(new View(context, Collections.emptyList())); + View oldView, newView; + do + { + oldView = view.get(); + newView = new View(context, oldView.getSSTables(), Collections.emptySet()); + + } while (!view.compareAndSet(oldView, newView)); - for (SSTableIndex index : previousView) + for (SSTableIndex index : oldView) { if (indexWasDropped) index.markIndexDropped(); diff --git a/src/java/org/apache/cassandra/index/sai/view/View.java b/src/java/org/apache/cassandra/index/sai/view/View.java index d8852258200f..d62bdbef6077 100644 --- a/src/java/org/apache/cassandra/index/sai/view/View.java +++ b/src/java/org/apache/cassandra/index/sai/view/View.java @@ -27,6 +27,8 @@ import java.util.Map; import java.util.Set; +import javax.annotation.Nullable; + import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.index.sai.IndexContext; @@ -39,15 +41,17 @@ public class View implements Iterable { + private final Set sstables; private final Map view; private final TermTree termTree; private final AbstractType keyValidator; private final IntervalTree> keyIntervalTree; - public View(IndexContext context, Collection indexes) + public View(IndexContext context, Collection sstables, Collection indexes) { this.view = new HashMap<>(); + this.sstables = new HashSet<>(sstables); this.keyValidator = context.keyValidator(); AbstractType validator = context.getValidator(); @@ -94,6 +98,11 @@ public Iterator iterator() return view.values().iterator(); } + public Collection getSSTables() + { + return sstables; + } + public Collection getIndexes() { return view.values(); @@ -101,7 +110,7 @@ public Collection getIndexes() public boolean containsSSTable(SSTableReader sstable) { - return view.containsKey(sstable.descriptor); + return sstables.contains(sstable.descriptor); } public int size() @@ -109,6 +118,16 @@ public int size() return view.size(); } + public @Nullable SSTableIndex getSSTableIndex(Descriptor descriptor) + { + return view.get(descriptor); + } + + public boolean hasIndexForSSTable(SSTableReader sstable) + { + return view.containsKey(sstable.descriptor); + } + /** * This is required since IntervalTree doesn't support custom Comparator * implementations and relied on items to be comparable which "raw" keys are not. diff --git a/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java b/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java index ded14e3c82a9..696ce6cd0fe6 100644 --- a/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java +++ b/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java @@ -23,6 +23,7 @@ import javax.annotation.Nullable; import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.db.lifecycle.View; import org.apache.cassandra.db.memtable.Memtable; import org.apache.cassandra.io.sstable.format.SSTableReader; diff --git a/test/unit/org/apache/cassandra/index/SecondaryIndexManagerTest.java b/test/unit/org/apache/cassandra/index/SecondaryIndexManagerTest.java index ebcde4e5411b..d0e1ea16e97b 100644 --- a/test/unit/org/apache/cassandra/index/SecondaryIndexManagerTest.java +++ b/test/unit/org/apache/cassandra/index/SecondaryIndexManagerTest.java @@ -192,10 +192,10 @@ public void testIndexRebuildWhenAddingSStableViaRemoteReload() assertEmpty(execute("SELECT * FROM %s WHERE a=1")); assertEmpty(execute("SELECT * FROM %s WHERE c=1")); - // track sstable again: expect no rows to be read by index + // track sstable again: expect the query that needs the index cannot execute cfs.getTracker().addInitialSSTables(sstables); assertRows(execute("SELECT * FROM %s WHERE a=1"), row(1, 1, 1)); - assertEmpty(execute("SELECT * FROM %s WHERE c=1")); + assertInvalid("SELECT * FROM %s WHERE c=1"); // remote reload should trigger index rebuild cfs.getTracker().notifySSTablesChanged(Collections.emptySet(), sstables, OperationType.REMOTE_RELOAD, Optional.empty(), null); diff --git a/test/unit/org/apache/cassandra/index/sai/cql/DropIndexWhileQueryingTest.java b/test/unit/org/apache/cassandra/index/sai/cql/DropIndexWhileQueryingTest.java index 4535a59a03e4..302369290cf0 100644 --- a/test/unit/org/apache/cassandra/index/sai/cql/DropIndexWhileQueryingTest.java +++ b/test/unit/org/apache/cassandra/index/sai/cql/DropIndexWhileQueryingTest.java @@ -43,7 +43,7 @@ public void testDropIndexWhileQuerying() throws Throwable createIndex("CREATE CUSTOM INDEX ON %s(z) USING 'StorageAttachedIndex'"); waitForTableIndexesQueryable(); - injectIndexDrop("drop_index", indexName, true); + injectIndexDrop("drop_index", indexName, "buildPlan", true); execute("INSERT INTO %s (k, x, y, z) VALUES (?, ?, ?, ?)", "car", 0, "y0", "z0"); String query = "SELECT * FROM %s WHERE x IN (0, 1) OR (y IN ('Y0', 'Y1' ) OR z IN ('z1', 'z2'))"; @@ -51,6 +51,26 @@ public void testDropIndexWhileQuerying() throws Throwable assertInvalidMessage(StatementRestrictions.REQUIRES_ALLOW_FILTERING_MESSAGE, query); } + @Test + public void testFallbackToAnotherIndex() throws Throwable + { + createTable("CREATE TABLE %s (k text PRIMARY KEY, x int, y text, z text)"); + + createIndex("CREATE CUSTOM INDEX ON %s(y) USING 'StorageAttachedIndex'"); + String indexName1 = createIndex("CREATE CUSTOM INDEX ON %s(x) USING 'StorageAttachedIndex'"); + String indexName2 = createIndex("CREATE CUSTOM INDEX ON %s(z) USING 'StorageAttachedIndex'"); + waitForTableIndexesQueryable(); + + injectIndexDrop("drop_index_1", indexName1, "buildIterator", true); + injectIndexDrop("drop_index_2", indexName2, "buildIterator", true); + + execute("INSERT INTO %s (k, x, y, z) VALUES (?, ?, ?, ?)", "k1", 0, "y0", "z0"); // match + execute("INSERT INTO %s (k, x, y, z) VALUES (?, ?, ?, ?)", "k2", 0, "y1", "z2"); // no match + execute("INSERT INTO %s (k, x, y, z) VALUES (?, ?, ?, ?)", "k3", 5, "y2", "z0"); // no match + String query = "SELECT * FROM %s WHERE x = 0 AND y = 'y0' AND z = 'z0'"; + assertRowCount(execute(query), 1); + } + // See CNDB-10535 @Test public void testDropVectorIndexWhileQuerying() throws Throwable @@ -59,7 +79,7 @@ public void testDropVectorIndexWhileQuerying() throws Throwable String indexName = createIndex("CREATE CUSTOM INDEX ON %s(val) USING 'StorageAttachedIndex'"); waitForTableIndexesQueryable(); - injectIndexDrop("drop_index2", indexName, false); + injectIndexDrop("drop_index2", indexName, "buildPlan", false); execute("INSERT INTO %s (pk, str_val, val) VALUES (0, 'A', [1.0, 2.0, 3.0])"); @@ -68,9 +88,9 @@ public void testDropVectorIndexWhileQuerying() throws Throwable assertInvalidMessage(String.format(StatementRestrictions.NON_CLUSTER_ORDERING_REQUIRES_INDEX_MESSAGE, "val"), query); } - private static void injectIndexDrop(String injectionName, String indexName, boolean atEntry) throws Throwable + private static void injectIndexDrop(String injectionName, String indexName, String methodName, boolean atEntry) throws Throwable { - InvokePointBuilder invokePoint = newInvokePoint().onClass(QueryController.class).onMethod("buildPlan"); + InvokePointBuilder invokePoint = newInvokePoint().onClass(QueryController.class).onMethod(methodName); Injection injection = Injections.newCustom(injectionName) .add(atEntry ? invokePoint.atEntry() : invokePoint.atExit()) .add(ActionBuilder @@ -88,7 +108,7 @@ private static void injectIndexDrop(String injectionName, String indexName, bool @SuppressWarnings("unused") public static void dropIndexForBytemanInjections(String indexName) { - String fullQuery = String.format("DROP INDEX %s.%s", KEYSPACE, indexName); + String fullQuery = String.format("DROP INDEX IF EXISTS %s.%s", KEYSPACE, indexName); logger.info(fullQuery); schemaChange(fullQuery); } diff --git a/test/unit/org/apache/cassandra/index/sai/cql/NativeIndexDDLTest.java b/test/unit/org/apache/cassandra/index/sai/cql/NativeIndexDDLTest.java index 12b597e85ca8..ebda5a426430 100644 --- a/test/unit/org/apache/cassandra/index/sai/cql/NativeIndexDDLTest.java +++ b/test/unit/org/apache/cassandra/index/sai/cql/NativeIndexDDLTest.java @@ -1036,6 +1036,8 @@ private void verifyRebuildIndexComponent(IndexContext numericIndexContext, (corruptionType != CorruptionType.REMOVED)) return; + logger.info("CORRUPTING: " + component + ", corruption type = " + corruptionType); + int rowCount = 2; // initial verification diff --git a/test/unit/org/apache/cassandra/index/sai/view/IndexViewManagerTest.java b/test/unit/org/apache/cassandra/index/sai/view/IndexViewManagerTest.java index 2d25b56ba1b4..81e2afaa2bbc 100644 --- a/test/unit/org/apache/cassandra/index/sai/view/IndexViewManagerTest.java +++ b/test/unit/org/apache/cassandra/index/sai/view/IndexViewManagerTest.java @@ -160,6 +160,7 @@ public void testConcurrentUpdate() throws Throwable .map(desc -> new Descriptor(new File(tmpDir), KEYSPACE, tableName, desc.id)) .map(desc -> desc.getFormat().getReaderFactory().open(desc)) .collect(Collectors.toList()); + assertThat(sstables).hasSize(4); List none = Collections.emptyList(); @@ -178,16 +179,19 @@ public void testConcurrentUpdate() throws Throwable initialIndexes.add(mockSSTableIndex); } - IndexViewManager tracker = new IndexViewManager(columnContext, initialIndexes); + IndexViewManager tracker = new IndexViewManager(columnContext, descriptors, initialIndexes); View initialView = tracker.getView(); assertEquals(2, initialView.size()); - List compacted = sstables.stream().skip(2).limit(1).map(s -> SSTableContext.create(s, loadDescriptor(s, store).perSSTableComponents())).collect(Collectors.toList()); - List flushed = sstables.stream().skip(3).limit(1).map(s -> SSTableContext.create(s, loadDescriptor(s, store).perSSTableComponents())).collect(Collectors.toList()); + List compacted = List.of(sstables.get(2)); + List flushed = List.of(sstables.get(3)); + + List compactedContexts = compacted.stream().map(s -> SSTableContext.create(s, loadDescriptor(s, store).perSSTableComponents())).collect(Collectors.toList()); + List flushedContexts = flushed.stream().map(s -> SSTableContext.create(s, loadDescriptor(s, store).perSSTableComponents())).collect(Collectors.toList()); // concurrently update from both flush and compaction - Future compaction = executor.submit(() -> tracker.update(initial, compacted, true)); - Future flush = executor.submit(() -> tracker.update(none, flushed, true)); + Future compaction = executor.submit(() -> tracker.update(initial, compacted, compactedContexts, true)); + Future flush = executor.submit(() -> tracker.update(none, flushed, flushedContexts, true)); FBUtilities.waitOnFutures(Arrays.asList(compaction, flush)); @@ -206,11 +210,11 @@ public void testConcurrentUpdate() throws Throwable initialContexts.forEach(group -> assertTrue(group.isCleanedUp())); // release compacted and flushed SSTableContext original and shared copies - compacted.forEach(SSTableContext::close); - flushed.forEach(SSTableContext::close); + compactedContexts.forEach(SSTableContext::close); + flushedContexts.forEach(SSTableContext::close); tracker.getView().getIndexes().forEach(SSTableIndex::release); - compacted.forEach(group -> assertTrue(group.isCleanedUp())); - flushed.forEach(group -> assertTrue(group.isCleanedUp())); + compactedContexts.forEach(group -> assertTrue(group.isCleanedUp())); + flushedContexts.forEach(group -> assertTrue(group.isCleanedUp())); } sstables.forEach(sstable -> sstable.selfRef().release()); executor.shutdown();