Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add generic sorting (ORDER BY) on non-clustering columns using SAI #1073

Merged
merged 87 commits into from
Jul 31, 2024

Conversation

michaeljmarshall
Copy link
Member

@michaeljmarshall michaeljmarshall commented Apr 3, 2024

High Level Design

Add support for ORDER BY via SAI. The PR essentially makes the ANN vector ordering logic more generic by adding a few abstractions. This PR adds support for ordering all existing BA and CA indexes without writing any new index files. AA indexes cannot be sorted because the primary key map is based on partitions, not primary keys.

New Abstractions

  • PrimaryKeyWithSortKey - this is a primary key that has some metadata that can be used for sorting. The two implementations at the moment are a float used for vector similarity score and a ByteComparable object used from numeric and text indexes.
  • RowWithSourceTable and CellWithSourceTable - this abstraction gives us a way to validate the source of a given cell. That source is then used to confirm whether an index's ranking of the data is valid.

Notable design details

  • ORDER BY on SAI requires CL=1.
  • Updates on same memtable: unlike the VectorMemtableIndex, the TrieMemoryIndex does not override MemtableIndex#update. Instead, an update is handled as an insertion. That presents two problems. First, the memtable has both values, and second, the resulting sstable index has both values. This PR addresses that edge case by comparing the index's view of the value to memtable/sstable's live view of the value for the row. If implement MemtableIndex#update, we can remove the PrimaryKeyWithSortKey#isIndexDataEqualToLiveData guard.
  • The RowWithSourceTable and CellWithSourceTable are used by vector ordering to prevent re-calculating the score in the TopKProcessor. Once we solve the update problem for numeric and text indexes, we will do the same for those indexes.
  • In order for the "source table" logic to work, we must acquire a single view of the sstables and memtables and their associated indexes. Otherwise, we risk classifying a valid row as "updated". In the StorageAttachedIndexSearcher, we get this view and hold it for the duration of the query. Because hyrbid queries do not need to have the source table validated, they are not included in the view logic, which simplifies determining which indexes need to be locked.
  • Hybrid queries are only weakly index based. Because we do not have an easy way to map from PrimaryKey to value in our numeric and text indexes, the quickest solution to implement was to read the value from the sstable and then sort in the index.
  • @pkolaczk updated the cost based query planner for kd tree and trie based ordering. They might need tweaking in subsequent testing.

Unsupported data types

Currently, there are 3 types that are not sortable via SAI:

  • InetAddressType because we need to add decoding logic based on SAI's TypeUtil.encode method, and because SAI sorts InetAddress objects differently that Cassandra's InetAddressType comparator sorts them
  • DecimalType because SAI truncates the value to 24 bytes
  • IntegerType because SAI truncates to 20 bytes

The first one could be fixed based in the current paradigm, and the last two will likely require a change to the index format. Technically, we could optionally pull out all of the values that have more than 20 or 24 bytes, but I think that isn't worth the effort.

Dependencies

The following PRs originally had the following dependencies merged to vsearch and then into this branch:

michaeljmarshall and others added 22 commits April 2, 2024 14:11
Key changes in this commit:

* Replace ScorePrimaryKey with PrimaryKeyWithSortKey. This
is an abstract class that currently implements PrimaryKey
and allows for a subclass to supply a sort key used for
both sorting peer PrimaryKeys and will eventually support
validating rows read from storage.

* Similarly, replace ScoredRowId with RowIdWithMeta, though
the name meta might become SortKey in a later commit.

This commit does not introduce any new feature and all tests
should still pass.
This commit does not work, but I am
adding it to save my work.
This commit does make many tests pass,
but it is not the ideal solution. I'll
follow up tomorrow to clean up the logic
and abstractions.
* Also adds more test coverage, the tests
found some issues and I fixed those.

* I still have some open questions about
the correct ByteComparable implementation
details. I am going to rely on tests to
catch issues.
Vector uses a similarity score calculation to ensure
correctness of order. However, such computation is not
possible if do not have the source value stored in the
index or if we're unable to efficiently find the source
value. Instead, we can use the source value for a given
cell to determine if the index's source is the same.

When the materialized cell and the index's value
come from the same ss or memtable, we know that the
value was not subsequently updated and we therefore
know that the index's ordering is correct. If they
do not match, the ordering correctness is not known
(though it could still be correct if overwritten by
the same value), so we throw out the row to ensure
absolute correctness.

Future work will remove our reliance on scores and
ByteComparable objects and we'll replace them with
an overquery of at least one extra row per sstable
and memtable. This is required because switching
to validating source reference equality renders
index ordering results unable to be compared.

However, the one benefit is that we will not
need to do any computations or byte buffer
comparisons to determine order validity. Thus,
the new design will have some benefits and some
drawbacks. The largest benefit is supporting
a completely new class of types that are
eligible for ORDER BY clauses.
There are several valid scenarios where the source
tables are not equal, but the rows could have scores
indicating the row is valid. See several tests
in VectorUpdateDeleteTest that fail on the last
commit. With this change, those tests are passing.
I include stricter assertions on row order to ensure
we do not have a subtle regression.

Note that the cases where the score would otherwise
indicate the row is valid are cases where we are
doing something suboptimal. However, these are
unlikely scenarios and not ones we need to optimize
for. Instead, we're prioritizing supporting more
generic order by.
So far, everything is ascending and a
bit hacky. I need to write more tests.
Now that we're ordering a bunch of types,
it is relevant to get a bit more of a
framework of tests.
With this change, we can do hybrid queries on
non-ann order by queries.
SimpleDateType is serialized incorrectly
for ordering.

InetAddressType might be sortable, but
I am skipping for now.

UUIDType does not seem meaningful
to sort, but I could be mistaken.
This commit is a hack to prevent issues
related to using an empty byte buffer in
the SimpleExpression class.

I tried to find a way to break orderings
out of the current framework, but it is not
easy to do, so for now, we continue to
deepen the hack...
Because we are using memtable references and sstable ids
to validate that a given cell in an ORDER BY result
out of an SAI index is valid, we need to make sure
that we have a single, consistent view of the indexes
and their associated mem/ss tables. The abstraction
could probably improve, but the underlying design
seems to work.

We do not use the lock on the boolean predicates because
those are validated using their actual predicate logic,
and therefore, we can skip any extra work to use the
ORDER BY view for the WHERE clauses.
This is still a WIP. The abstraction is getting closer
though.
All of the failing tests were related to
merging SingleColumnRestrictions. Because
I moved the logic to add ordering restrictions
earlier in the StatementRestrictions#doBuild
method, I changed the order that we can expect
restrictions to get merged. This simplifies the
logic and keeps the complexity all within the
ordering restrictions, which might someday
get moved out to their own abstraction.

Note: I still need to fix SelectOrderByTest.
@michaeljmarshall michaeljmarshall changed the title Add support for ordering on non-clustering columns using SAI Add generic sorting (ORDER BY) on non-clustering columns using SAI Jul 22, 2024
@michaeljmarshall michaeljmarshall changed the title Add generic sorting (ORDER BY) on non-clustering columns using SAI Add generic sorting ORDER BY on non-clustering columns using SAI Jul 22, 2024
@michaeljmarshall michaeljmarshall changed the title Add generic sorting ORDER BY on non-clustering columns using SAI Add generic sorting (ORDER BY) on non-clustering columns using SAI Jul 22, 2024
@michaeljmarshall
Copy link
Member Author

@eolivelli @pkolaczk - I think this PR is very close (I know we want it merged as soon as possible), but I reviewed some of the TODOs left in the code, and I think it might be worth either another pass before merging or shortly after merging. All of the tests are passing, which is a great sign, but the TODOs are some open questions I had when writing the PR and I still don't seem to have answers yet.

@pkolaczk
Copy link

pkolaczk commented Jul 26, 2024

I found a bug:

ERROR [ReadStage-1] 2024-07-26 16:37:15,253 AbstractLocalAwareExecutorService.java:169 - Uncaught exception on thread Thread[ReadStage-1,5,main]
java.lang.NullPointerException: null
	at org.apache.cassandra.utils.bytecomparable.ByteComparable.compare(ByteComparable.java:138)
	at org.apache.cassandra.index.sai.utils.PrimaryKeyWithByteComparable.compareTo(PrimaryKeyWithByteComparable.java:66)
	at org.apache.cassandra.index.sai.utils.PrimaryKeyWithByteComparable.compareTo(PrimaryKeyWithByteComparable.java:33)
	at java.base/java.util.Comparator.lambda$comparing$77a9974f$1(Comparator.java:469)
	at java.base/java.util.PriorityQueue.siftUpUsingComparator(PriorityQueue.java:675)
	at java.base/java.util.PriorityQueue.siftUp(PriorityQueue.java:652)
	at java.base/java.util.PriorityQueue.offer(PriorityQueue.java:345)
	at java.base/java.util.PriorityQueue.add(PriorityQueue.java:326)
	at org.apache.cassandra.index.sai.utils.MergePrimaryWithSortKeyIterator.computeNext(MergePrimaryWithSortKeyIterator.java:76)
	at org.apache.cassandra.index.sai.utils.MergePrimaryWithSortKeyIterator.computeNext(MergePrimaryWithSortKeyIterator.java:38)
	at org.apache.cassandra.utils.AbstractIterator.hasNext(AbstractIterator.java:47)
	at com.google.common.collect.Iterators$PeekingImpl.hasNext(Iterators.java:1126)
	at org.apache.cassandra.index.sai.utils.MergePrimaryWithSortKeyIterator.computeNext(MergePrimaryWithSortKeyIterator.java:75)
	at org.apache.cassandra.index.sai.utils.MergePrimaryWithSortKeyIterator.computeNext(MergePrimaryWithSortKeyIterator.java:38)
	at org.apache.cassandra.utils.AbstractIterator.hasNext(AbstractIterator.java:47)
	at org.apache.cassandra.index.sai.plan.StorageAttachedIndexSearcher$ScoreOrderedResultRetriever.nextSelectedKeyInRange(StorageAttachedIndexSearcher.java:536)
	at org.apache.cassandra.index.sai.plan.StorageAttachedIndexSearcher$ScoreOrderedResultRetriever.nextRowIterator(StorageAttachedIndexSearcher.java:504)
	at org.apache.cassandra.index.sai.plan.StorageAttachedIndexSearcher$ScoreOrderedResultRetriever.computeNext(StorageAttachedIndexSearcher.java:486)
	at org.apache.cassandra.index.sai.plan.StorageAttachedIndexSearcher$ScoreOrderedResultRetriever.computeNext(StorageAttachedIndexSearcher.java:447)
	at org.apache.cassandra.utils.AbstractIterator.hasNext(AbstractIterator.java:47)
	at org.apache.cassandra.index.sai.plan.TopKProcessor.filterInternal(TopKProcessor.java:222)
	at org.apache.cassandra.index.sai.plan.TopKProcessor.filter(TopKProcessor.java:157)
	at org.apache.cassandra.index.sai.plan.StorageAttachedIndexSearcher.search(StorageAttachedIndexSearcher.java:139)
	at org.apache.cassandra.db.ReadCommand.searchStorage(ReadCommand.java:512)
	at org.apache.cassandra.db.ReadCommand.executeLocally(ReadCommand.java:410)
	at org.apache.cassandra.db.ReadCommandVerbHandler.doVerb(ReadCommandVerbHandler.java:58)
	at org.apache.cassandra.net.InboundSink.lambda$new$0(InboundSink.java:79)
	at org.apache.cassandra.net.InboundSink.accept(InboundSink.java:98)
	at org.apache.cassandra.net.InboundSink.accept(InboundSink.java:46)
	at org.apache.cassandra.net.InboundMessageHandler$ProcessMessage.run(InboundMessageHandler.java:436)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at org.apache.cassandra.concurrent.AbstractLocalAwareExecutorService$FutureTask.run(AbstractLocalAwareExecutorService.java:165)
	at org.apache.cassandra.concurrent.AbstractLocalAwareExecutorService$LocalSessionFutureTask.run(AbstractLocalAwareExecutorService.java:137)
	at org.apache.cassandra.concurrent.SEPWorker.run(SEPWorker.java:119)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:829)

Reproduction

Load data with latte order.rn workload mentioned here (https://docs.google.com/document/d/11qqu9D1DM8kYTFP3aFhaxFkd4zRnBI6Yh-QPrlnBzfw/edit?usp=sharing)

Do not flush after loading.

Issue the query:

SELECT * FROM orderby WHERE a >= 0.1 AND a < 0.2 ORDER BY a LIMIT 10

It works fine after flushing. I mean, almost - because it is still slow (takes 3 seconds!)

@michaeljmarshall
Copy link
Member Author

@pkolaczk - nice catch. I introduced that bug while resolving merge conflicts.

@pkolaczk
Copy link

Retested it and now it works fine.

Copy link

@pkolaczk pkolaczk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is overall great new feature!

I'm conditionally approving this, provided the following issues are fixed:

  • SELECT * FROM tab WHERE a > const ORDER BY a queries currently don't work because of huge performance cost; neither for on-disk or in-memory indexes. Either let's fix it or at least disable it?

  • Don't forget to merge the PR for lazy memory index range scans as that PR solves the first performance issues I mentioned in the doc.

Copy link

sonarcloud bot commented Jul 26, 2024

@jkni
Copy link

jkni commented Jul 30, 2024

Reviewed the most recent five commits, particularly the CANONICAL filter change. The CANONICAL change resolves all my local reproducers for issues with source Object mismatch when using LIVE sstables containing preemptively opened readers. LGTM!

This test started failing
when I inverted the compareTo
for the PrimaryKeyWithScore.
Copy link

sonarcloud bot commented Jul 30, 2024

@michaeljmarshall
Copy link
Member Author

This is overall great new feature!

I'm conditionally approving this, provided the following issues are fixed:

* `SELECT * FROM tab WHERE a > const ORDER BY a` queries currently don't work because of huge performance cost; neither for on-disk or in-memory indexes. Either let's fix it or at least disable it?

* Don't forget to merge the PR for lazy memory index range scans as that PR solves the first performance issues I mentioned in the doc.

Merging this PR with the understanding that we will follow up with individual PRs to address these issues.

@michaeljmarshall michaeljmarshall merged commit 628c61d into main Jul 31, 2024
441 of 451 checks passed
@michaeljmarshall michaeljmarshall deleted the sai-order-by branch July 31, 2024 02:42
djatnieks pushed a commit that referenced this pull request Aug 21, 2024
…1073)

Add support for `ORDER BY` via SAI. The PR essentially makes the ANN vector ordering logic more generic by adding a few abstractions. This PR adds support for ordering all existing BA and CA indexes without writing any new index files. AA indexes cannot be sorted because the primary key map is based on partitions, not primary keys.

* `PrimaryKeyWithSortKey` - this is a primary key that has some metadata that can be used for sorting. The two implementations at the moment are a `float` used for vector similarity score and a `ByteComparable` object used from numeric and text indexes.
* `RowWithSourceTable` and `CellWithSourceTable` - this abstraction gives us a way to validate the source of a given cell. That source is then used to confirm whether an index's ranking of the data is valid.

* `ORDER BY` on SAI requires CL=1.
* Updates on same memtable: unlike the `VectorMemtableIndex`, the `TrieMemoryIndex` does not override `MemtableIndex#update`. Instead, an update is handled as an insertion. That presents two problems. First, the memtable has both values, and second, the resulting sstable index has both values. This PR addresses that edge case by comparing the index's view of the value to memtable/sstable's live view of the value for the row. If implement `MemtableIndex#update`, we can remove the `PrimaryKeyWithSortKey#isIndexDataEqualToLiveData` guard.
* The `RowWithSourceTable` and `CellWithSourceTable` are used by vector ordering to prevent re-calculating the score in the `TopKProcessor`. Once we solve the update problem for numeric and text indexes, we will do the same for those indexes.
* In order for the "source table" logic to work, we must acquire a single view of the sstables and memtables and their associated indexes. Otherwise, we risk classifying a valid row as "updated". In the `StorageAttachedIndexSearcher`, we get this view and hold it for the duration of the query. Because hyrbid queries do not need to have the source table validated, they are not included in the view logic, which simplifies determining which indexes need to be locked.
* Hybrid queries are only weakly index based. Because we do not have an easy way to map from `PrimaryKey` to `value` in our numeric and text indexes, the quickest solution to implement was to read the value from the sstable and then sort in the index.
* @pkolaczk updated the cost based query planner for kd tree and trie based ordering. They might need tweaking in subsequent testing.

Currently, there are 3 types that are not sortable via SAI:

* `InetAddressType` because we need to add decoding logic based on SAI's TypeUtil.encode method, and because SAI sorts `InetAddress` objects differently that Cassandra's `InetAddressType` comparator sorts them
* `DecimalType` because SAI truncates the value to 24 bytes
* `IntegerType` because SAI truncates to 20 bytes

The first one could be fixed based in the current paradigm, and the last two will likely require a change to the index format. Technically, we could optionally pull out all of the values that have more than 20 or 24 bytes, but I think that isn't worth the effort.

---------

Co-authored-by: Piotr Kołaczkowski <pkolaczk@datastax.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants