Skip to content
Merged
29 changes: 25 additions & 4 deletions src/java/org/apache/cassandra/index/sai/IndexContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.cassandra.index.sai.utils.TypeUtil;
import org.apache.cassandra.index.sai.view.IndexViewManager;
import org.apache.cassandra.index.sai.view.View;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.schema.ColumnMetadata;
Expand Down Expand Up @@ -152,6 +153,8 @@ public class IndexContext

private final int maxTermSize;

private volatile boolean dropped = false;

public IndexContext(@Nonnull String keyspace,
@Nonnull String table,
@Nonnull TableId tableId,
Expand All @@ -173,7 +176,6 @@ public IndexContext(@Nonnull String keyspace,
this.viewManager = new IndexViewManager(this);
this.validator = TypeUtil.cellValueType(column, indexType);
this.cfs = cfs;

this.primaryKeyFactory = Version.latest().onDiskFormat().newPrimaryKeyFactory(clusteringComparator);

if (config != null)
Expand Down Expand Up @@ -564,9 +566,12 @@ public long estimatedOffHeapMemIndexMemoryUsed()
/**
* @return A set of SSTables which have attached to them invalid index components.
*/
public Set<SSTableContext> onSSTableChanged(Collection<SSTableReader> oldSSTables, Collection<SSTableContext> newSSTables, boolean validate)
public Set<SSTableContext> onSSTableChanged(Collection<SSTableReader> oldSSTables,
Collection<SSTableReader> newSSTables,
Collection<SSTableContext> newContexts,
boolean validate)
{
return viewManager.update(oldSSTables, newSSTables, validate);
return viewManager.update(oldSSTables, newSSTables, newContexts, validate);
}

public ColumnMetadata getDefinition()
Expand Down Expand Up @@ -656,7 +661,12 @@ public void prepareSSTablesForRebuild(Collection<SSTableReader> sstablesToRebuil

public boolean isIndexed()
{
return config != null;
return config != null && !dropped;
}

public boolean isDropped()
{
return dropped;
}

/**
Expand All @@ -675,6 +685,7 @@ public boolean isAnalyzed()
*/
public void invalidate(boolean obsolete)
{
dropped = true;
liveMemtables.clear();
viewManager.invalidate(obsolete);
indexMetrics.release();
Expand All @@ -692,6 +703,16 @@ public ConcurrentMap<Memtable, MemtableIndex> getLiveMemtables()
return liveMemtables;
}

public @Nullable MemtableIndex getMemtableIndex(Memtable memtable)
{
return liveMemtables.get(memtable);
}

public @Nullable SSTableIndex getSSTableIndex(Descriptor descriptor)
{
return getView().getSSTableIndex(descriptor);
}

public boolean supports(Operator op)
{
if (op.isLike() || op == Operator.LIKE) return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,7 @@ private synchronized List<SSTableReader> findNonIndexedSSTables(ColumnFamilyStor
// 1. The current view does not contain the SSTable
// 2. The SSTable is not marked compacted
// 3. The column index does not have a completion marker
if (!view.containsSSTable(sstable)
if (!view.hasIndexForSSTable(sstable)
&& !sstable.isMarkedCompacted()
&& !IndexDescriptor.isIndexBuildCompleteOnDisk(sstable, indexContext))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -89,6 +90,8 @@ public class StorageAttachedIndexGroup implements Index.Group, INotificationCons

private final SSTableContextManager contextManager;



StorageAttachedIndexGroup(ColumnFamilyStore baseCfs)
{
this.baseCfs = baseCfs;
Expand Down Expand Up @@ -302,7 +305,7 @@ public void handleNotification(INotification notification, Object sender)
// Avoid validation for index files just written following Memtable flush. ZCS streaming should
// validate index checksum.
boolean validate = notice.fromStreaming() || !notice.memtable().isPresent();
onSSTableChanged(Collections.emptySet(), notice.added, indices, validate);
onSSTableChanged(Collections.emptySet(), Lists.newArrayList(notice.added), indices, validate);
}
else if (notification instanceof SSTableListChangedNotification)
{
Expand Down Expand Up @@ -342,7 +345,7 @@ void prepareIndexSSTablesForRebuild(Collection<SSTableReader> ss, StorageAttache
* @return the set of column indexes that were marked as non-queryable as a result of their per-SSTable index
* files being corrupt or being unable to successfully update their views
*/
public synchronized Set<StorageAttachedIndex> onSSTableChanged(Collection<SSTableReader> removed, Iterable<SSTableReader> added,
public synchronized Set<StorageAttachedIndex> onSSTableChanged(Collection<SSTableReader> removed, Collection<SSTableReader> added,
Set<StorageAttachedIndex> indexes, boolean validate)
{
Optional<Set<SSTableContext>> optValid = contextManager.update(removed, added, validate, indices);
Expand All @@ -357,7 +360,7 @@ public synchronized Set<StorageAttachedIndex> onSSTableChanged(Collection<SSTabl

for (StorageAttachedIndex index : indexes)
{
Set<SSTableContext> invalid = index.getIndexContext().onSSTableChanged(removed, optValid.get(), validate);
Set<SSTableContext> invalid = index.getIndexContext().onSSTableChanged(removed, added, optValid.get(), validate);

if (!invalid.isEmpty())
{
Expand Down
34 changes: 25 additions & 9 deletions src/java/org/apache/cassandra/index/sai/plan/QueryController.java
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ private KeyRangeIterator buildIterator(Expression predicate)
* Invocations are memorized - multiple calls for the same context return the same view.
* The views are kept for the lifetime of this {@code QueryController}.
*/
QueryView getQueryView(IndexContext context)
QueryView getQueryView(IndexContext context) throws QueryView.Builder.MissingIndexException
{
return queryViews.computeIfAbsent(context,
c -> new QueryView.Builder(c, mergeRange, queryContext).build());
Expand Down Expand Up @@ -586,6 +586,13 @@ public CloseableIterator<PrimaryKeyWithSortKey> getTopKRows(Expression predicate
sstableResults.addAll(memtableResults);
return MergeIterator.getNonReducingCloseable(sstableResults, orderer.getComparator());
}
catch (QueryView.Builder.MissingIndexException e)
{
if (orderer.context.isDropped())
throw invalidRequest(TopKProcessor.INDEX_MAY_HAVE_BEEN_DROPPED);
else
throw new IllegalStateException("Index not found but hasn't been dropped", e);
}
catch (Throwable t)
{
FileUtils.closeQuietly(memtableResults);
Expand Down Expand Up @@ -648,15 +655,16 @@ private List<PrimaryKey> materializeKeys(KeyRangeIterator source)
private CloseableIterator<PrimaryKeyWithSortKey> getTopKRows(List<PrimaryKey> sourceKeys, int softLimit)
{
Tracing.logAndTrace(logger, "SAI predicates produced {} keys", sourceKeys.size());
QueryView view = getQueryView(orderer.context);
var memtableResults = view.memtableIndexes.stream()
.map(index -> index.orderResultsBy(queryContext,
sourceKeys,
orderer,
softLimit))
.collect(Collectors.toList());
List<CloseableIterator<PrimaryKeyWithSortKey>> memtableResults = null;
try
{
QueryView view = getQueryView(orderer.context);
memtableResults = view.memtableIndexes.stream()
.map(index -> index.orderResultsBy(queryContext,
sourceKeys,
orderer,
softLimit))
.collect(Collectors.toList());
var totalRows = view.getTotalSStableRows();
SSTableSearcher ssTableSearcher = index -> index.orderResultsBy(queryContext,
sourceKeys,
Expand All @@ -667,9 +675,17 @@ private CloseableIterator<PrimaryKeyWithSortKey> getTopKRows(List<PrimaryKey> so
sstableScoredPrimaryKeyIterators.addAll(memtableResults);
return MergeIterator.getNonReducingCloseable(sstableScoredPrimaryKeyIterators, orderer.getComparator());
}
catch (QueryView.Builder.MissingIndexException e)
{
if (orderer.context.isDropped())
throw invalidRequest(TopKProcessor.INDEX_MAY_HAVE_BEEN_DROPPED);
else
throw new IllegalStateException("Index not found but hasn't been dropped", e);
}
catch (Throwable t)
{
FileUtils.closeQuietly(memtableResults);
if (memtableResults != null)
FileUtils.closeQuietly(memtableResults);
throw t;
}

Expand Down
Loading