Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ protected PrimaryKey computeNext()
if (rowId == PostingList.END_OF_STREAM)
return endOfData();

return new PrimaryKeyWithSource(primaryKeyMap, rowId, searcherContext.minimumKey, searcherContext.maximumKey);
return primaryKeyMap.primaryKeyFromRowId(rowId, searcherContext.minimumKey, searcherContext.maximumKey);
}
catch (Throwable t)
{
Expand Down Expand Up @@ -160,20 +160,11 @@ private long getNextRowId() throws IOException
long segmentRowId;
if (needsSkipping)
{
long targetSstableRowId;
if (skipToToken instanceof PrimaryKeyWithSource
&& ((PrimaryKeyWithSource) skipToToken).getSourceSstableId().equals(primaryKeyMap.getSSTableId()))
long targetSstableRowId = primaryKeyMap.ceiling(skipToToken);
// skipToToken is larger than max token in token file
if (targetSstableRowId < 0)
{
targetSstableRowId = ((PrimaryKeyWithSource) skipToToken).getSourceRowId();
}
else
{
targetSstableRowId = primaryKeyMap.ceiling(skipToToken);
// skipToToken is larger than max token in token file
if (targetSstableRowId < 0)
{
return PostingList.END_OF_STREAM;
}
return PostingList.END_OF_STREAM;
}
int targetSegmentRowId = Math.toIntExact(targetSstableRowId - searcherContext.getSegmentRowIdOffset());
segmentRowId = postingList.advance(targetSegmentRowId);
Expand Down
19 changes: 19 additions & 0 deletions src/java/org/apache/cassandra/index/sai/disk/PrimaryKeyMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.Closeable;
import java.io.IOException;

import javax.annotation.Nonnull;
import javax.annotation.concurrent.NotThreadSafe;

import org.apache.cassandra.index.sai.utils.PrimaryKey;
Expand Down Expand Up @@ -84,6 +85,24 @@ default void close() throws IOException
*/
PrimaryKey primaryKeyFromRowId(long sstableRowId);

/**
* Returns a {@link PrimaryKey} for a row Id
*
* Note: the lower and upper bounds are used to avoid reading the primary key from disk in the event
* that compared primary keys are in non-overlapping ranges. The ranges can be within the table, and must
* contain the row id. This requirement is not validated, as validation would remove the performance benefit
* of this optimization.
*
* @param sstableRowId the row Id to lookup
* @param lowerBound the inclusive lower bound of the primary key being created
* @param upperBound the inclusive upper bound of the primary key being created
* @return the {@link PrimaryKey} associated with the row Id
*/
default PrimaryKey primaryKeyFromRowId(long sstableRowId, @Nonnull PrimaryKey lowerBound, @Nonnull PrimaryKey upperBound)
{
return primaryKeyFromRowId(sstableRowId);
}

/**
* Returns a row Id for a {@link PrimaryKey}. If there is no such term, returns the `-(next row id) - 1` where
* `next row id` is the row id of the next greatest {@link PrimaryKey} in the map.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.cassandra.index.sai.SSTableContext;
import org.apache.cassandra.index.sai.disk.ModernResettableByteBuffersIndexOutput;
import org.apache.cassandra.index.sai.disk.PostingList;
import org.apache.cassandra.index.sai.disk.PrimaryKeyWithSource;
import org.apache.cassandra.index.sai.disk.format.IndexComponentType;
import org.apache.cassandra.index.sai.disk.format.Version;
import org.apache.cassandra.index.sai.disk.io.IndexInput;
Expand Down Expand Up @@ -181,10 +180,9 @@ private SegmentMetadata(IndexInput input, IndexContext context, Version version,
// We need to load eagerly to allow us to close the partition key map.
min = pkm.primaryKeyFromRowId(minSSTableRowId).loadDeferred();
max = pkm.primaryKeyFromRowId(maxSSTableRowId).loadDeferred();
this.minKey = pkm.primaryKeyFromRowId(minSSTableRowId, min, max).loadDeferred();
this.maxKey = pkm.primaryKeyFromRowId(maxSSTableRowId, min, max).loadDeferred();
}

this.minKey = new PrimaryKeyWithSource(min, sstableContext.sstable.getId(), minSSTableRowId, min, max);
this.maxKey = new PrimaryKeyWithSource(max, sstableContext.sstable.getId(), maxSSTableRowId, min, max);
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,28 @@
* limitations under the License.
*/

package org.apache.cassandra.index.sai.disk;
package org.apache.cassandra.index.sai.disk.v2;

import io.github.jbellis.jvector.util.RamUsageEstimator;
import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.index.sai.disk.PrimaryKeyMap;
import org.apache.cassandra.index.sai.utils.PrimaryKey;
import org.apache.cassandra.io.sstable.SSTableId;
import org.apache.cassandra.utils.bytecomparable.ByteComparable;
import org.apache.cassandra.utils.bytecomparable.ByteSource;

public class PrimaryKeyWithSource implements PrimaryKey
class PrimaryKeyWithSource implements PrimaryKey
{
private final PrimaryKeyMap primaryKeyMap;
private final SSTableId<?> sourceSstableId;
private final long sourceRowId;
private PrimaryKey delegatePrimaryKey;
private PrimaryKeyMap primaryKeyMap;
private final PrimaryKey sourceSstableMinKey;
private final PrimaryKey sourceSstableMaxKey;

public PrimaryKeyWithSource(PrimaryKeyMap primaryKeyMap, long sstableRowId, PrimaryKey sourceSstableMinKey, PrimaryKey sourceSstableMaxKey)
PrimaryKeyWithSource(PrimaryKeyMap primaryKeyMap, long sstableRowId, PrimaryKey sourceSstableMinKey, PrimaryKey sourceSstableMaxKey)
{
this.primaryKeyMap = primaryKeyMap;
this.sourceSstableId = primaryKeyMap.getSSTableId();
Expand All @@ -45,20 +46,13 @@ public PrimaryKeyWithSource(PrimaryKeyMap primaryKeyMap, long sstableRowId, Prim
this.sourceSstableMaxKey = sourceSstableMaxKey;
}

public PrimaryKeyWithSource(PrimaryKey primaryKey, SSTableId<?> sourceSstableId, long sourceRowId, PrimaryKey sourceSstableMinKey, PrimaryKey sourceSstableMaxKey)
{
this.delegatePrimaryKey = primaryKey;
this.primaryKeyMap = null;
this.sourceSstableId = sourceSstableId;
this.sourceRowId = sourceRowId;
this.sourceSstableMinKey = sourceSstableMinKey;
this.sourceSstableMaxKey = sourceSstableMaxKey;
}

private PrimaryKey primaryKey()
{
if (delegatePrimaryKey == null)
{
delegatePrimaryKey = primaryKeyMap.primaryKeyFromRowId(sourceRowId);
primaryKeyMap = null; // Removes the no longer needed reference to the primary key map.
}

return delegatePrimaryKey;
}
Expand All @@ -74,13 +68,10 @@ public SSTableId<?> getSourceSstableId()
}

@Override
public PrimaryKeyWithSource forStaticRow()
public PrimaryKey forStaticRow()
{
return new PrimaryKeyWithSource(primaryKey().forStaticRow(),
sourceSstableId,
sourceRowId,
sourceSstableMinKey,
sourceSstableMaxKey);
// We cannot use row awareness if we need a static row.
return primaryKey().forStaticRow();
}

@Override
Expand All @@ -104,7 +95,8 @@ public Clustering clustering()
@Override
public PrimaryKey loadDeferred()
{
return primaryKey().loadDeferred();
primaryKey().loadDeferred();
return this;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.cassandra.db.ClusteringComparator;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.index.sai.disk.PrimaryKeyMap;
import org.apache.cassandra.index.sai.utils.PrimaryKey;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.bytecomparable.ByteComparable;
Expand Down Expand Up @@ -67,6 +68,11 @@ public PrimaryKey create(DecoratedKey partitionKey, Clustering clustering)
return new RowAwarePrimaryKey(partitionKey.getToken(), partitionKey, clustering, null);
}

PrimaryKey createWithSource(PrimaryKeyMap primaryKeyMap, long sstableRowId, PrimaryKey sourceSstableMinKey, PrimaryKey sourceSstableMaxKey)
{
return new PrimaryKeyWithSource(primaryKeyMap, sstableRowId, sourceSstableMinKey, sourceSstableMaxKey);
}

private class RowAwarePrimaryKey implements PrimaryKey
{
private Token token;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,11 @@ public static class RowAwarePrimaryKeyMapFactory implements Factory
private FileHandle termsTrie = null;
private final IPartitioner partitioner;
private final ClusteringComparator clusteringComparator;
private final PrimaryKey.Factory primaryKeyFactory;
private final RowAwarePrimaryKeyFactory primaryKeyFactory;
private final SSTableId<?> sstableId;
private final boolean hasStaticColumns;

public RowAwarePrimaryKeyMapFactory(IndexComponents.ForRead perSSTableComponents, PrimaryKey.Factory primaryKeyFactory, SSTableReader sstable)
public RowAwarePrimaryKeyMapFactory(IndexComponents.ForRead perSSTableComponents, RowAwarePrimaryKeyFactory primaryKeyFactory, SSTableReader sstable)
{
try
{
Expand All @@ -105,6 +106,7 @@ public RowAwarePrimaryKeyMapFactory(IndexComponents.ForRead perSSTableComponents
this.primaryKeyFactory = primaryKeyFactory;
this.clusteringComparator = sstable.metadata().comparator;
this.sstableId = sstable.getId();
this.hasStaticColumns = sstable.metadata().hasStaticColumns();
}
catch (Throwable t)
{
Expand All @@ -124,7 +126,8 @@ public PrimaryKeyMap newPerSSTablePrimaryKeyMap()
partitioner,
primaryKeyFactory,
clusteringComparator,
sstableId);
sstableId,
hasStaticColumns);
}
catch (IOException e)
{
Expand All @@ -149,17 +152,19 @@ public void close() throws IOException
private final SortedTermsReader sortedTermsReader;
private final SortedTermsReader.Cursor cursor;
private final IPartitioner partitioner;
private final PrimaryKey.Factory primaryKeyFactory;
private final RowAwarePrimaryKeyFactory primaryKeyFactory;
private final ClusteringComparator clusteringComparator;
private final SSTableId<?> sstableId;
private final boolean hasStaticColumns;

private RowAwarePrimaryKeyMap(LongArray rowIdToToken,
SortedTermsReader sortedTermsReader,
SortedTermsReader.Cursor cursor,
IPartitioner partitioner,
PrimaryKey.Factory primaryKeyFactory,
RowAwarePrimaryKeyFactory primaryKeyFactory,
ClusteringComparator clusteringComparator,
SSTableId<?> sstableId)
SSTableId<?> sstableId,
boolean hasStaticColumns)
{
this.rowIdToToken = rowIdToToken;
this.sortedTermsReader = sortedTermsReader;
Expand All @@ -168,6 +173,7 @@ private RowAwarePrimaryKeyMap(LongArray rowIdToToken,
this.primaryKeyFactory = primaryKeyFactory;
this.clusteringComparator = clusteringComparator;
this.sstableId = sstableId;
this.hasStaticColumns = hasStaticColumns;
}

@Override
Expand All @@ -188,6 +194,13 @@ public PrimaryKey primaryKeyFromRowId(long sstableRowId)
return primaryKeyFactory.createDeferred(partitioner.getTokenFactory().fromLongValue(token), () -> supplier(sstableRowId));
}

@Override
public PrimaryKey primaryKeyFromRowId(long sstableRowId, PrimaryKey lowerBound, PrimaryKey upperBound)
{
return hasStaticColumns ? primaryKeyFromRowId(sstableRowId)
: primaryKeyFactory.createWithSource(this, sstableRowId, lowerBound, upperBound);
}

private long skinnyExactRowIdOrInvertedCeiling(PrimaryKey key)
{
// Fast path when there is no clustering, i.e., there is one row per partition.
Expand All @@ -212,6 +225,13 @@ private long skinnyExactRowIdOrInvertedCeiling(PrimaryKey key)
@Override
public long exactRowIdOrInvertedCeiling(PrimaryKey key)
{
if (key instanceof PrimaryKeyWithSource)
{
var pkws = (PrimaryKeyWithSource) key;
if (pkws.getSourceSstableId().equals(sstableId))
return pkws.getSourceRowId();
}

if (clusteringComparator.size() == 0)
return skinnyExactRowIdOrInvertedCeiling(key);

Expand All @@ -226,6 +246,13 @@ public long exactRowIdOrInvertedCeiling(PrimaryKey key)
@Override
public long ceiling(PrimaryKey key)
{
if (key instanceof PrimaryKeyWithSource)
{
var pkws = (PrimaryKeyWithSource) key;
if (pkws.getSourceSstableId().equals(sstableId))
return pkws.getSourceRowId();
}

if (clusteringComparator.size() == 0)
{
long rowId = skinnyExactRowIdOrInvertedCeiling(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ public PrimaryKey.Factory newPrimaryKeyFactory(ClusteringComparator comparator)
@Override
public PrimaryKeyMap.Factory newPrimaryKeyMapFactory(IndexComponents.ForRead perSSTableComponents, PrimaryKey.Factory primaryKeyFactory, SSTableReader sstable)
{
return new RowAwarePrimaryKeyMap.RowAwarePrimaryKeyMapFactory(perSSTableComponents, primaryKeyFactory, sstable);
assert primaryKeyFactory instanceof RowAwarePrimaryKeyFactory;
return new RowAwarePrimaryKeyMap.RowAwarePrimaryKeyMapFactory(perSSTableComponents, (RowAwarePrimaryKeyFactory) primaryKeyFactory, sstable);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.apache.cassandra.index.sai.QueryContext;
import org.apache.cassandra.index.sai.disk.PostingList;
import org.apache.cassandra.index.sai.disk.PrimaryKeyMap;
import org.apache.cassandra.index.sai.disk.PrimaryKeyWithSource;
import org.apache.cassandra.index.sai.disk.v1.IndexSearcher;
import org.apache.cassandra.index.sai.disk.v1.PerIndexFiles;
import org.apache.cassandra.index.sai.disk.v1.SegmentMetadata;
Expand Down Expand Up @@ -534,12 +533,7 @@ private SegmentRowIdOrdinalPairs flatmapPrimaryKeysToBitsAndRows(List<PrimaryKey
{
// turn the pk back into a row id, with a fast path for the case where the pk is from this sstable
var primaryKey = keysInRange.get(i);
long sstableRowId;
if (primaryKey instanceof PrimaryKeyWithSource
&& ((PrimaryKeyWithSource) primaryKey).getSourceSstableId().equals(primaryKeyMap.getSSTableId()))
sstableRowId = ((PrimaryKeyWithSource) primaryKey).getSourceRowId();
else
sstableRowId = primaryKeyMap.exactRowIdOrInvertedCeiling(primaryKey);
long sstableRowId = primaryKeyMap.exactRowIdOrInvertedCeiling(primaryKey);

if (sstableRowId < 0)
{
Expand Down
Loading