Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support sorting on complex columns in MSQ #16322

Merged
merged 29 commits into from
May 13, 2024

Conversation

LakshSingla
Copy link
Contributor

@LakshSingla LakshSingla commented Apr 23, 2024

Description

MSQ sorts the columns in a highly specialized manner by byte comparisons. As such the values are serialized differently. This works well for the primitive types and primitive arrays, however complex types cannot be serialized specially.

This PR adds the support for sorting the complex columns by deserializing the value from the field and comparing it via the type strategy. This is a lot slower than the byte comparisons, however, it's the only way to support sorting on complex columns that can have arbitrary serialization not optimized for MSQ.

The primitives and the arrays are still compared via the byte comparison, therefore this doesn't affect the performance of the queries supported before the patch. If there's a sorting key with mixed complex and primitive/primitive array types, for example: longCol1 ASC, longCol2 ASC, complexCol1 DESC, complexCol2 DESC, stringCol1 DESC, longCol3 DESC, longCol4 ASC, the comparison will happen like:

  • longCol1, longCol2 (ASC) - Compared together via byte-comparison, since both are byte comparable and need to be sorted in ascending order
  • complexCol1 (DESC) - Compared via deserialization, cannot be clubbed with any other field
  • complexCol2 (DESC) - Compared via deserialization, cannot be clubbed with any other field, even though the prior field was a complex column with the same order
  • stringCol1, longCol3 (DESC) - Compared together via byte-comparison, since both are byte comparable and need to be sorted in descending order
  • longCol4 (ASC) - Compared via byte-comparison, couldn't be coalesced with the previous fields as the direction was different

This way, we only deserialize the field wherever required

Backward Compatibility

No backward incompatible change has been made.

Future work

Complex types can expose and implement special handling for types such as SerializablePairs, which can be serialized in a byte-comparable way and won't need to be deserialized for comparison. It will fit in easily with the current changes.

Release note

MSQ now supports sorting on all the complex columns, and grouping on the complex columns that support it.


Key changed/added classes in this PR
  • MyFoo
  • OurBar
  • TheirBaz

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@github-actions github-actions bot added Area - Batch Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 labels Apr 23, 2024
@LakshSingla LakshSingla added this to the 30.0.0 milestone Apr 29, 2024
Copy link
Contributor

@cryptoe cryptoe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add uts for

  • The runLength stuff
  • ByteComparators
  • Frame comparators
  • ByteFrameComparators.

@LakshSingla
Copy link
Contributor Author

LakshSingla commented May 8, 2024

Thanks for the review @cryptoe. I have addressed the comments and added tests for all the comparisons that I have added as a part of this patch.

  • RunLength stuff -> RowKeyComparisonRunLengthsTest
  • byte v byte comparison -> ByteRowKeyComparatorTest and RowKeyComparatorTest
  • Frame v byte comparison -> FrameComparisonWidgetImplTest
  • Frame v Frame comparison -> FrameWriterTestData, FrameWriterTest

@LakshSingla
Copy link
Contributor Author

LakshSingla commented May 8, 2024

While making the changes suggested in #16322 (comment), I realized this would be a backward incompatible change. This will modify the way the complex fields are written on the frame. Since frames are materialized in durable storage, frames generated by the newer byte-comparable writers won't get read properly by the older clusters, and the newer ones won't read the frames generated by the older clusters.
To properly modify the way the complex fields are serialized, we would need to have a newer field reader + writer combo, and enable it via a cluster-wide config (I can't find a way to do it without a config). Since that will increase the scope of the PR widely, I am punting it in this PR.

Copy link
Contributor

@gianm gianm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other than the line comments, please run a benchmark to verify that performance has not degraded for the non-complex case. You can use FrameChannelMergerBenchmark. For extra credit, extend it to optionally include complex keys and benchmark that as well.

@@ -1086,6 +1086,8 @@ public void testGroupByRootSingleTypeStringMixed2Sparse()
@Test
public void testGroupByRootSingleTypeStringMixed2SparseJsonValueNonExistentPath()
{
// Fails while planning
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does it fail?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MSQ requires the scan signature, however the path doesn't exist. Therefore https://github.com/apache/druid/blob/master/processing/src/main/java/org/apache/druid/segment/virtual/NestedFieldVirtualColumn.java#L1279 returns null when the column capabilities is asked, while the planner requires the capability to determine the column's type https://github.com/apache/druid/blob/master/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java#L1733-L1733.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the comment with the reason

assert runLengthEntry.getRunLength() == 1;
// 'fieldsComparedTillNow' is the index of the current keyColumn in the keyColumns list. Sanity check that its
// a known complex type
ColumnType columnType = signature.getColumnType(keyColumns.get(fieldsComparedTillNow).columnName())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should avoid this map lookup for getColumnType(String) and also avoid the lookup by string in serdeMap. That's expensive stuff to be doing for each comparison. Instead, how about doing a serdes that is a pre-built ComplexMetricsSerde[] (in the constructor), indexed by position in keyColumns. Each entry in serdes is either a serde (for complex types) or null otherwise. The entire serdes should be null if there are no complex types.

assert runLengthEntry.getRunLength() == 1;
// 'fieldsComparedTillNow' is the index of the current keyColumn in the keyColumns list. Sanity check that its
// a known complex type
ColumnType columnType = rowSignature.getColumnType(keyColumns.get(fieldsComparedTillNow).columnName())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment to FrameComparisonWidgetImpl: we should avoid this map lookup for getColumnType(String) and also avoid the lookup by string in serdeMap. That's expensive stuff to be doing for each comparison. Instead, how about doing a serdes that is a pre-built ComplexMetricsSerde[] (in the constructor), indexed by position in keyColumns. Each entry in serdes is either a serde (for complex types) or null otherwise. The entire serdes should be null if there are no complex types.

I don't think rowSignature is used for anything else, so when we make this change we can also get rid of the rowSignature field.

import java.util.Collections;
import java.util.List;

public class RowKeyComparisonRunLengthsTest
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a situation like this, I like to have one test that does a whole space of things. Here I suggest a test case that checks all possible length-3 keys where we vary each key element's type between string and complex, and direction between asc and desc. So that's 4 possibilities for each key element (string asc, string desc, complex asc, complex desc) and therefore 64 keys we're testing. It will run fast since it's only 64 cases, and it gets good coverage of different possibilities for runs and orderings.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was considering having a parameterized test, however, how do I add the expected results for each test case generically, without repeating the logic in RowKeyComparisonRunLengths.

For example, if the input test case is "complex ASC, string DESC, string DESC"; how can I get the expected value of the result, without repeating what I have written in RowKeyComparisonRunLength? One way would be iterating the results of all 64 combinations, which can be done, but I wanted to confirm if that's what you referred to in the comment.

The test case would read something like:

assertionHelper(
  HelperUtils.createCheckerFor(new RunLength(false, 1, ASC), new RunLength(true, 2, DESC))
  RowKeyComparisonRunLength.create("complex ASC, string DESC, string DESC")
);
.... (total 64 entries)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking of having all 64 test cases enumerated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the test cases - the tests are generated programmatically in the loop (mapping each index to the corresponding permutation of the key column), and the expectations are enumerated corresponding to each test case

@LakshSingla
Copy link
Contributor Author

Regarding the msqIncompatible, I figured out that about a half of them were failing due to not reading the correct file in the test case. I didn't inspect them deeply while allowing the nested tests on MSQ since the group by ones (that I was primarily interested in) were correctly working with MSQ. So the tests were failing with incorrect results because the dataset (all_auto) wasn't same between MSQ and the native tests.

I have fixed up those test cases and resolved the comments pertaining to those tests.

@gianm
Copy link
Contributor

gianm commented May 9, 2024

Regarding the msqIncompatible, I figured out that about a half of them were failing due to not reading the correct file in the test case. I didn't inspect them deeply while allowing the nested tests on MSQ since the group by ones (that I was primarily interested in) were correctly working with MSQ. So the tests were failing with incorrect results because the dataset (all_auto) wasn't same between MSQ and the native tests.

I have fixed up those test cases and resolved the comments pertaining to those tests.

Thank you! Good to know, I was worried something was wrong with the implementation on the MSQ side.

@LakshSingla
Copy link
Contributor Author

Benchmarked the pre-existing code, there isn't a regression in the byte comparable types:

master
Benchmark                                  (channelDistributionString)  (keyGeneratorString)  (keyLength)  (numChannels)  (numRows)  (rowLength)  Mode  Cnt     Score     Error  Units
FrameChannelMergerBenchmark.mergeChannels                  round_robin                random           20              2    5000000          100  avgt    5  1020.405 ±  70.317  ms/op
FrameChannelMergerBenchmark.mergeChannels                  round_robin                random           20             16    5000000          100  avgt    5  1583.528 ± 523.469  ms/op
FrameChannelMergerBenchmark.mergeChannels                  round_robin            sequential           20              2    5000000          100  avgt    5  1206.522 ± 323.548  ms/op
FrameChannelMergerBenchmark.mergeChannels                  round_robin            sequential           20             16    5000000          100  avgt    5  1996.365 ± 339.964  ms/op
FrameChannelMergerBenchmark.mergeChannels                    clustered                random           20              2    5000000          100  avgt    5   860.327 ±  65.100  ms/op
FrameChannelMergerBenchmark.mergeChannels                    clustered                random           20             16    5000000          100  avgt    5  1346.599 ±  38.168  ms/op
FrameChannelMergerBenchmark.mergeChannels                    clustered            sequential           20              2    5000000          100  avgt    5   955.820 ± 167.006  ms/op
FrameChannelMergerBenchmark.mergeChannels                    clustered            sequential           20             16    5000000          100  avgt    5  1162.238 ±  52.815  ms/op


This patch:
Benchmark                                  (channelDistributionString)  (keyGeneratorString)  (keyLength)  (numChannels)  (numRows)  (rowLength)  Mode  Cnt     Score     Error  Units
FrameChannelMergerBenchmark.mergeChannels                  round_robin                random           20              2    5000000          100  avgt    5   938.234 ±  38.388  ms/op
FrameChannelMergerBenchmark.mergeChannels                  round_robin                random           20             16    5000000          100  avgt    5  1392.245 ±  46.723  ms/op
FrameChannelMergerBenchmark.mergeChannels                  round_robin            sequential           20              2    5000000          100  avgt    5  1001.648 ±  24.656  ms/op
FrameChannelMergerBenchmark.mergeChannels                  round_robin            sequential           20             16    5000000          100  avgt    5  1902.121 ± 115.118  ms/op
FrameChannelMergerBenchmark.mergeChannels                    clustered                random           20              2    5000000          100  avgt    5   915.268 ±  39.973  ms/op
FrameChannelMergerBenchmark.mergeChannels                    clustered                random           20             16    5000000          100  avgt    5  1458.134 ± 240.780  ms/op
FrameChannelMergerBenchmark.mergeChannels                    clustered            sequential           20              2    5000000          100  avgt    5   895.689 ±  38.291  ms/op
FrameChannelMergerBenchmark.mergeChannels                    clustered            sequential           20             16    5000000          100  avgt    5  1218.842 ± 266.408  ms/op

I'll update this comment once I benchmark the complex comparison code as well

@LakshSingla
Copy link
Contributor Author

Thanks for a thorough review @gianm 😄
I have updated the PR since the last review with all of the review comments. Here's a summary of the changes:

  1. Regarding the CalciteNestedDataQueryTest failures - I have addressed all of those. Few tests are still incompatible due to MSQ's limitation of not having field writers for nested arrays, and a few are incompatible because the nested virtual column isn't able to provide the correct type inference for sparse or non existent types.
  2. Added the benchmarks and tested out that there's no regression in the code before and after this patch. I am rerunning the benchmarks with the nested data stuff. This will also add as a sanity check the code after the latest round of changes.
  3. Modified the methods to not wrap the byte array into a Memory
  4. Modified the readers to cache the column type and metric serde in an array
  5. Have added the tests mentioned in the review.

Copy link
Contributor

@gianm gianm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The recent changes are looking good to me. I had a few cleanup comments.

@@ -7072,6 +7090,8 @@ public void testUnnestJsonQueryArraysJsonValueSum()
@Test
public void testJsonValueNestedEmptyArray()
{
// Returns incorrect results with MSQ
msqIncompatible();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, we can leave this for another time.

Copy link
Contributor

@cryptoe cryptoe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes LGTM!!.

@LakshSingla
Copy link
Contributor Author

Thanks for the reviews @gianm and @cryptoe. I have addressed the last round of comments in the latest commits.

@cryptoe cryptoe merged commit 4bfc186 into apache:master May 13, 2024
87 checks passed
@cryptoe
Copy link
Contributor

cryptoe commented May 13, 2024

Thanks @LakshSingla for the updates.

adarshsanjeev pushed a commit to adarshsanjeev/druid that referenced this pull request May 13, 2024
MSQ sorts the columns in a highly specialized manner by byte comparisons. As such the values are serialized differently. This works well for the primitive types and primitive arrays, however complex types cannot be serialized specially.

This PR adds the support for sorting the complex columns by deserializing the value from the field and comparing it via the type strategy. This is a lot slower than the byte comparisons, however, it's the only way to support sorting on complex columns that can have arbitrary serialization not optimized for MSQ.

The primitives and the arrays are still compared via the byte comparison, therefore this doesn't affect the performance of the queries supported before the patch. If there's a sorting key with mixed complex and primitive/primitive array types, for example: longCol1 ASC, longCol2 ASC, complexCol1 DESC, complexCol2 DESC, stringCol1 DESC, longCol3 DESC, longCol4 ASC, the comparison will happen like:

    longCol1, longCol2 (ASC) - Compared together via byte-comparison, since both are byte comparable and need to be sorted in ascending order
    complexCol1 (DESC) - Compared via deserialization, cannot be clubbed with any other field
    complexCol2 (DESC) - Compared via deserialization, cannot be clubbed with any other field, even though the prior field was a complex column with the same order
    stringCol1, longCol3 (DESC) - Compared together via byte-comparison, since both are byte comparable and need to be sorted in descending order
    longCol4 (ASC) - Compared via byte-comparison, couldn't be coalesced with the previous fields as the direction was different

This way, we only deserialize the field wherever required
LakshSingla added a commit that referenced this pull request May 13, 2024
* Support sorting on complex columns in MSQ (#16322)

MSQ sorts the columns in a highly specialized manner by byte comparisons. As such the values are serialized differently. This works well for the primitive types and primitive arrays, however complex types cannot be serialized specially.

This PR adds the support for sorting the complex columns by deserializing the value from the field and comparing it via the type strategy. This is a lot slower than the byte comparisons, however, it's the only way to support sorting on complex columns that can have arbitrary serialization not optimized for MSQ.

The primitives and the arrays are still compared via the byte comparison, therefore this doesn't affect the performance of the queries supported before the patch. If there's a sorting key with mixed complex and primitive/primitive array types, for example: longCol1 ASC, longCol2 ASC, complexCol1 DESC, complexCol2 DESC, stringCol1 DESC, longCol3 DESC, longCol4 ASC, the comparison will happen like:

    longCol1, longCol2 (ASC) - Compared together via byte-comparison, since both are byte comparable and need to be sorted in ascending order
    complexCol1 (DESC) - Compared via deserialization, cannot be clubbed with any other field
    complexCol2 (DESC) - Compared via deserialization, cannot be clubbed with any other field, even though the prior field was a complex column with the same order
    stringCol1, longCol3 (DESC) - Compared together via byte-comparison, since both are byte comparable and need to be sorted in descending order
    longCol4 (ASC) - Compared via byte-comparison, couldn't be coalesced with the previous fields as the direction was different

This way, we only deserialize the field wherever required

* Fix conflicts

---------

Co-authored-by: Laksh Singla <lakshsingla@gmail.com>
@LakshSingla
Copy link
Contributor Author

LakshSingla commented May 13, 2024

Before the patch

1   Benchmark                                  (channelDistributionString)  (keyGeneratorString)  (keyLength)  (numChannels)  (numRows)  (rowLength)  Mode  Cnt     Score     Error  Units
  1 FrameChannelMergerBenchmark.mergeChannels                  round_robin                random           20              2    5000000          100  avgt    5   926.859 ±  62.468  ms/op
  2 FrameChannelMergerBenchmark.mergeChannels                  round_robin                random           20             16    5000000          100  avgt    5  1425.121 ±  42.400  ms/op
  3 FrameChannelMergerBenchmark.mergeChannels                  round_robin            sequential           20              2    5000000          100  avgt    5  1020.367 ±  44.110  ms/op
  4 FrameChannelMergerBenchmark.mergeChannels                  round_robin            sequential           20             16    5000000          100  avgt    5  1794.424 ± 111.994  ms/op
  5 FrameChannelMergerBenchmark.mergeChannels                    clustered                random           20              2    5000000          100  avgt    5   909.969 ±  14.896  ms/op
  6 FrameChannelMergerBenchmark.mergeChannels                    clustered                random           20             16    5000000          100  avgt    5  1509.181 ± 421.195  ms/op
  7 FrameChannelMergerBenchmark.mergeChannels                    clustered            sequential           20              2    5000000          100  avgt    5   930.828 ±  80.848  ms/op
  8 FrameChannelMergerBenchmark.mergeChannels                    clustered            sequential           20             16    5000000          100  avgt    5  1147.251 ±  82.809  ms/op

After the patch

 19 Benchmark                                  (channelDistributionString)  (columnType)  (keyGeneratorString)  (keyLength)  (numChannels)  (numRows)  (rowLength)  Mode  Cnt      Score       Error  Units
 20 FrameChannelMergerBenchmark.mergeChannels                  round_robin        string                random           20              2    5000000          100  avgt    5    909.346 ±   121.474  ms/op
 21 FrameChannelMergerBenchmark.mergeChannels                  round_robin        string                random           20             16    5000000          100  avgt    5   1342.791 ±    50.920  ms/op
 22 FrameChannelMergerBenchmark.mergeChannels                  round_robin        string            sequential           20              2    5000000          100  avgt    5    970.077 ±    42.599  ms/op
 23 FrameChannelMergerBenchmark.mergeChannels                  round_robin        string            sequential           20             16    5000000          100  avgt    5   1702.374 ±    36.589  ms/op
 28 FrameChannelMergerBenchmark.mergeChannels                    clustered        string                random           20              2    5000000          100  avgt    5    937.706 ±    47.813  ms/op
 29 FrameChannelMergerBenchmark.mergeChannels                    clustered        string                random           20             16    5000000          100  avgt    5   1462.567 ±    49.337  ms/op
 30 FrameChannelMergerBenchmark.mergeChannels                    clustered        string            sequential           20              2    5000000          100  avgt    5   1080.969 ±   490.616  ms/op
 31 FrameChannelMergerBenchmark.mergeChannels                    clustered        string            sequential           20             16    5000000          100  avgt    5   1182.618 ±   125.105  ms/op

 24 FrameChannelMergerBenchmark.mergeChannels                  round_robin        nested                random           20              2    5000000          100  avgt    5   7027.089 ±   100.709  ms/op
 25 FrameChannelMergerBenchmark.mergeChannels                  round_robin        nested                random           20             16    5000000          100  avgt    5  16542.967 ±  1813.655  ms/op
 26 FrameChannelMergerBenchmark.mergeChannels                  round_robin        nested            sequential           20              2    5000000          100  avgt    5  12180.351 ± 17408.747  ms/op
 27 FrameChannelMergerBenchmark.mergeChannels                  round_robin        nested            sequential           20             16    5000000          100  avgt    5  26375.203 ± 10906.295  ms/op
 32 FrameChannelMergerBenchmark.mergeChannels                    clustered        nested                random           20              2    5000000          100  avgt    5   7408.249 ±   293.724  ms/op
 33 FrameChannelMergerBenchmark.mergeChannels                    clustered        nested                random           20             16    5000000          100  avgt    5  17861.245 ±  1497.691  ms/op
 34 FrameChannelMergerBenchmark.mergeChannels                    clustered        nested            sequential           20              2    5000000          100  avgt    5   7845.482 ±  1086.048  ms/op
 35 FrameChannelMergerBenchmark.mergeChannels                    clustered        nested            sequential           20             16    5000000          100  avgt    5  19398.613 ± 36239.777  ms/op

@LakshSingla LakshSingla deleted the msq-complex-sorting branch May 13, 2024 17:05
@LakshSingla LakshSingla mentioned this pull request May 22, 2024
1 task
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Area - Batch Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 Area - Querying Area - Segment Format and Ser/De
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants