Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for 'front coded' string dictionaries for smaller string columns #12277

Merged
merged 46 commits into from
Oct 26, 2022

Conversation

clintropolis
Copy link
Member

@clintropolis clintropolis commented Feb 23, 2022

Closes #3922

Description

This PR adds a new way of storing STRING typed columns, using an incremental encoding strategy called 'front coding'. Essentially, sorted values are split into buckets; the first value in the bucket is written completely and the remaining values in the bucket are stored as a pair, composed of the length of the first string which overlaps this value (the prefix length), and the remaining fragment of the value that remains after the prefix. Using the quickstart wikipedia example, this results in nicely reduced segment sizes, with relatively little performance penalty in most typical Druid queries.

Segment Size and Performance

Wikipedia "quickstart" segments:
Screen Shot 2022-02-23 at 3 19 33 AM

GenericIndexed<ByteBuffer> "singleThreaded" vs FrontCodedIndexed with bucket size 4 and size 16:

Benchmark                                            (indexType)  (numElements)  (numOperations)  (width)  Mode  Cnt        Score    Error  Units
FrontCodedIndexedBenchmark.get                           generic          10000            10000       16  avgt    5       12.842 ±  0.309  ns/op
FrontCodedIndexedBenchmark.get                           generic         100000            10000       16  avgt    5       14.173 ±  0.259  ns/op
FrontCodedIndexedBenchmark.get                     front-coded-4          10000            10000       16  avgt    5       39.418 ±  0.989  ns/op
FrontCodedIndexedBenchmark.get                     front-coded-4         100000            10000       16  avgt    5       43.614 ±  1.081  ns/op
FrontCodedIndexedBenchmark.get                    front-coded-16          10000            10000       16  avgt    5       68.667 ±  0.674  ns/op
FrontCodedIndexedBenchmark.get                    front-coded-16         100000            10000       16  avgt    5       72.695 ±  0.170  ns/op
FrontCodedIndexedBenchmark.indexOf                       generic          10000            10000       16  avgt    5      161.167 ±  1.945  ns/op
FrontCodedIndexedBenchmark.indexOf                       generic         100000            10000       16  avgt    5      229.376 ±  4.963  ns/op
FrontCodedIndexedBenchmark.indexOf                 front-coded-4          10000            10000       16  avgt    5      243.259 ±  0.290  ns/op
FrontCodedIndexedBenchmark.indexOf                 front-coded-4         100000            10000       16  avgt    5      377.807 ±  5.362  ns/op
FrontCodedIndexedBenchmark.indexOf                front-coded-16          10000            10000       16  avgt    5      252.050 ±  0.786  ns/op
FrontCodedIndexedBenchmark.indexOf                front-coded-16         100000            10000       16  avgt    5      381.047 ±  4.761  ns/op
FrontCodedIndexedBenchmark.iterator                      generic          10000            10000       16  avgt    5       39.341 ±  0.623  ns/op
FrontCodedIndexedBenchmark.iterator                      generic         100000            10000       16  avgt    5      384.242 ±  0.899  ns/op
FrontCodedIndexedBenchmark.iterator                front-coded-4          10000            10000       16  avgt    5       95.082 ± 18.971  ns/op
FrontCodedIndexedBenchmark.iterator                front-coded-4         100000            10000       16  avgt    5      690.636 ± 27.592  ns/op
FrontCodedIndexedBenchmark.iterator               front-coded-16          10000            10000       16  avgt    5       77.685 ± 29.205  ns/op
FrontCodedIndexedBenchmark.iterator               front-coded-16         100000            10000       16  avgt    5      606.071 ± 25.713  ns/op


FrontCodedIndexedBenchmark.get:encoded size              generic          10000            10000       16  avgt    5   240010.000           bytes
FrontCodedIndexedBenchmark.get:encoded size              generic         100000            10000       16  avgt    5  2400010.000           bytes
FrontCodedIndexedBenchmark.get:encoded size        front-coded-4          10000            10000       16  avgt    5   169992.000           bytes
FrontCodedIndexedBenchmark.get:encoded size        front-coded-4         100000            10000       16  avgt    5  1636900.000           bytes
FrontCodedIndexedBenchmark.get:encoded size       front-coded-16          10000            10000       16  avgt    5   164187.000           bytes
FrontCodedIndexedBenchmark.get:encoded size       front-coded-16         100000            10000       16  avgt    5  1564828.000           bytes

How this translates into queries has so far been measured with SqlBenchmark, though it's generated data-set does pretty poorly with this encoding due to being composed of numbers which have been translated directly into strings, leaving few prefixes to take advantage of (383MB with generic indexed compared to 381MB with front-coded indexed). This means that any size advantage is not present here and so this is likely near worst case for this encoding strategy.

SELECT SUM(sumLongSequential), SUM(sumFloatNormal) FROM foo WHERE dimSequential NOT LIKE '%3'

Benchmark              (query)  (rowsPerSegment)  (storageType)  (stringEncoding)  (vectorize)  Mode  Cnt      Score     Error  Units
SqlBenchmark.querySql        4           5000000           mmap              none        false  avgt    5     94.069 ±   1.454  ms/op
SqlBenchmark.querySql        4           5000000           mmap              none        force  avgt    5     45.142 ±   1.205  ms/op
SqlBenchmark.querySql        4           5000000           mmap     front-coded-4        false  avgt    5     90.089 ±   1.271  ms/op
SqlBenchmark.querySql        4           5000000           mmap     front-coded-4        force  avgt    5     44.842 ±   1.130  ms/op
SqlBenchmark.querySql        4           5000000           mmap    front-coded-16        false  avgt    5     90.121 ±   1.020  ms/op
SqlBenchmark.querySql        4           5000000           mmap    front-coded-16        force  avgt    5     44.965 ±   1.256  ms/op


-------

SELECT SUM(sumLongSequential), SUM(sumFloatNormal) FROM foo WHERE dimSequential = '311'

Benchmark              (query)  (rowsPerSegment)  (storageType)  (stringEncoding)  (vectorize)  Mode  Cnt      Score     Error  Units
SqlBenchmark.querySql        5           5000000           mmap              none        false  avgt    5     23.650 ±   0.605  ms/op
SqlBenchmark.querySql        5           5000000           mmap              none        force  avgt    5     22.420 ±   0.496  ms/op
SqlBenchmark.querySql        5           5000000           mmap     front-coded-4        false  avgt    5     23.756 ±   0.766  ms/op
SqlBenchmark.querySql        5           5000000           mmap     front-coded-4        force  avgt    5     22.045 ±   0.551  ms/op
SqlBenchmark.querySql        5           5000000           mmap    front-coded-16        false  avgt    5     23.671 ±   0.603  ms/op
SqlBenchmark.querySql        5           5000000           mmap    front-coded-16        force  avgt    5     22.044 ±   0.638  ms/op


-------

SELECT SUM(sumLongSequential), SUM(sumFloatNormal) FROM foo WHERE dimSequential NOT LIKE '%3' AND maxLongUniform > 10

Benchmark              (query)  (rowsPerSegment)  (storageType)  (stringEncoding)  (vectorize)  Mode  Cnt      Score     Error  Units
SqlBenchmark.querySql        6           5000000           mmap              none        false  avgt    5     48.876 ±   1.715  ms/op
SqlBenchmark.querySql        6           5000000           mmap              none        force  avgt    5     23.413 ±   0.713  ms/op
SqlBenchmark.querySql        6           5000000           mmap     front-coded-4        false  avgt    5     48.864 ±   1.748  ms/op
SqlBenchmark.querySql        6           5000000           mmap     front-coded-4        force  avgt    5     23.434 ±   0.901  ms/op
SqlBenchmark.querySql        6           5000000           mmap    front-coded-16        false  avgt    5     48.821 ±   1.737  ms/op
SqlBenchmark.querySql        6           5000000           mmap    front-coded-16        force  avgt    5     23.495 ±   0.684  ms/op


-------

SELECT
  SUM(sumLongSequential) FILTER(WHERE dimSequential = '311'),
  SUM(sumFloatNormal)
FROM foo WHERE dimSequential NOT LIKE '%3'

Benchmark              (query)  (rowsPerSegment)  (storageType)  (stringEncoding)  (vectorize)  Mode  Cnt      Score     Error  Units
SqlBenchmark.querySql        7           5000000           mmap              none        false  avgt    5     96.444 ±   3.474  ms/op
SqlBenchmark.querySql        7           5000000           mmap              none        force  avgt    5     51.797 ±   1.244  ms/op
SqlBenchmark.querySql        7           5000000           mmap     front-coded-4        false  avgt    5     96.150 ±   3.059  ms/op
SqlBenchmark.querySql        7           5000000           mmap     front-coded-4        force  avgt    5     51.678 ±   0.869  ms/op
SqlBenchmark.querySql        7           5000000           mmap    front-coded-16        false  avgt    5    107.410 ±   2.924  ms/op
SqlBenchmark.querySql        7           5000000           mmap    front-coded-16        force  avgt    5     51.833 ±   1.156  ms/op


-------

(timeseries bunch of filtered aggs)

Benchmark              (query)  (rowsPerSegment)  (storageType)  (stringEncoding)  (vectorize)  Mode  Cnt      Score     Error  Units
SqlBenchmark.querySql        8           5000000           mmap              none        false  avgt    5    893.378 ±   3.220  ms/op
SqlBenchmark.querySql        8           5000000           mmap              none        force  avgt    5    243.069 ±   3.936  ms/op
SqlBenchmark.querySql        8           5000000           mmap     front-coded-4        false  avgt    5    867.778 ±  13.031  ms/op
SqlBenchmark.querySql        8           5000000           mmap     front-coded-4        force  avgt    5    340.799 ±   4.697  ms/op
SqlBenchmark.querySql        8           5000000           mmap    front-coded-16        false  avgt    5    868.694 ±  18.088  ms/op
SqlBenchmark.querySql        8           5000000           mmap    front-coded-16        force  avgt    5    352.819 ±   2.469  ms/op


-------

SELECT dimSequential, dimZipf, SUM(sumLongSequential) FROM foo GROUP BY 1, 2

Benchmark              (query)  (rowsPerSegment)  (storageType)  (stringEncoding)  (vectorize)  Mode  Cnt      Score     Error  Units
SqlBenchmark.querySql       10           5000000           mmap              none        false  avgt    5    430.592 ±  34.032  ms/op
SqlBenchmark.querySql       10           5000000           mmap              none        force  avgt    5    235.971 ±   3.405  ms/op
SqlBenchmark.querySql       10           5000000           mmap     front-coded-4        false  avgt    5    426.965 ±   9.557  ms/op
SqlBenchmark.querySql       10           5000000           mmap     front-coded-4        force  avgt    5    247.754 ±   2.635  ms/op
SqlBenchmark.querySql       10           5000000           mmap    front-coded-16        false  avgt    5    434.664 ±  10.247  ms/op
SqlBenchmark.querySql       10           5000000           mmap    front-coded-16        force  avgt    5    250.683 ±   3.340  ms/op


-------

SELECT dimSequential, dimZipf, SUM(sumLongSequential), COUNT(*) FROM foo GROUP BY 1, 2

Benchmark              (query)  (rowsPerSegment)  (storageType)  (stringEncoding)  (vectorize)  Mode  Cnt      Score     Error  Units
SqlBenchmark.querySql       11           5000000           mmap              none        false  avgt    5    433.815 ±   7.673  ms/op
SqlBenchmark.querySql       11           5000000           mmap              none        force  avgt    5    255.151 ±   1.853  ms/op
SqlBenchmark.querySql       11           5000000           mmap     front-coded-4        false  avgt    5    442.550 ±   9.269  ms/op
SqlBenchmark.querySql       11           5000000           mmap     front-coded-4        force  avgt    5    263.782 ±   3.357  ms/op
SqlBenchmark.querySql       11           5000000           mmap    front-coded-16        false  avgt    5    441.626 ±   8.954  ms/op
SqlBenchmark.querySql       11           5000000           mmap    front-coded-16        force  avgt    5    268.237 ±   2.834  ms/op


-------

SELECT dimZipf FROM foo GROUP BY 1

Benchmark              (query)  (rowsPerSegment)  (storageType)  (stringEncoding)  (vectorize)  Mode  Cnt      Score     Error  Units
SqlBenchmark.querySql       12           5000000           mmap              none        false  avgt    5     42.031 ±   1.408  ms/op
SqlBenchmark.querySql       12           5000000           mmap              none        force  avgt    5     24.031 ±   0.881  ms/op
SqlBenchmark.querySql       12           5000000           mmap     front-coded-4        false  avgt    5     42.393 ±   0.648  ms/op
SqlBenchmark.querySql       12           5000000           mmap     front-coded-4        force  avgt    5     24.055 ±   1.008  ms/op
SqlBenchmark.querySql       12           5000000           mmap    front-coded-16        false  avgt    5     42.255 ±   1.134  ms/op
SqlBenchmark.querySql       12           5000000           mmap    front-coded-16        force  avgt    5     22.201 ±   1.120  ms/op


-------

(big union)

Benchmark              (query)  (rowsPerSegment)  (storageType)  (stringEncoding)  (vectorize)  Mode  Cnt      Score     Error  Units
SqlBenchmark.querySql       19           5000000           mmap              none        false  avgt    5    365.313 ±   5.432  ms/op
SqlBenchmark.querySql       19           5000000           mmap              none        force  avgt    5    315.686 ±   6.720  ms/op
SqlBenchmark.querySql       19           5000000           mmap     front-coded-4        false  avgt    5    477.117 ±  67.235  ms/op
SqlBenchmark.querySql       19           5000000           mmap     front-coded-4        force  avgt    5    404.028 ±   8.931  ms/op
SqlBenchmark.querySql       19           5000000           mmap    front-coded-16        false  avgt    5    489.550 ±  11.902  ms/op
SqlBenchmark.querySql       19           5000000           mmap    front-coded-16        force  avgt    5    443.758 ±   9.031  ms/op


-------

SELECT dimSequential, dimZipf, SUM(sumLongSequential) FROM foo WHERE dimUniform NOT LIKE '%3' GROUP BY 1, 2

Benchmark              (query)  (rowsPerSegment)  (storageType)  (stringEncoding)  (vectorize)  Mode  Cnt      Score     Error  Units
SqlBenchmark.querySql       21           5000000           mmap              none        false  avgt    5    444.995 ±   5.940  ms/op
SqlBenchmark.querySql       21           5000000           mmap              none        force  avgt    5    273.133 ±   1.405  ms/op
SqlBenchmark.querySql       21           5000000           mmap     front-coded-4        false  avgt    5    450.684 ±  13.259  ms/op
SqlBenchmark.querySql       21           5000000           mmap     front-coded-4        force  avgt    5    279.336 ±   2.098  ms/op
SqlBenchmark.querySql       21           5000000           mmap    front-coded-16        false  avgt    5    456.444 ±   9.268  ms/op
SqlBenchmark.querySql       21           5000000           mmap    front-coded-16        force  avgt    5    283.369 ±   2.217  ms/op


-------

SELECT dimZipf, SUM(sumLongSequential) FROM foo WHERE dimSequential = '311' GROUP BY 1 ORDER BY 1

Benchmark              (query)  (rowsPerSegment)  (storageType)  (stringEncoding)  (vectorize)  Mode  Cnt      Score     Error  Units
SqlBenchmark.querySql       22           5000000           mmap              none        false  avgt    5     15.510 ±   0.522  ms/op
SqlBenchmark.querySql       22           5000000           mmap              none        force  avgt    5     15.380 ±   0.475  ms/op
SqlBenchmark.querySql       22           5000000           mmap     front-coded-4        false  avgt    5     15.565 ±   0.652  ms/op
SqlBenchmark.querySql       22           5000000           mmap     front-coded-4        force  avgt    5     15.372 ±   0.504  ms/op
SqlBenchmark.querySql       22           5000000           mmap    front-coded-16        false  avgt    5     15.544 ±   0.667  ms/op
SqlBenchmark.querySql       22           5000000           mmap    front-coded-16        force  avgt    5     15.496 ±   0.441  ms/op


-------

SELECT * FROM foo

Benchmark              (query)  (rowsPerSegment)  (storageType)  (stringEncoding)  (vectorize)  Mode  Cnt      Score     Error  Units
SqlBenchmark.querySql       23           5000000           mmap              none        force  avgt    5  11097.105 ±  40.113  ms/op
SqlBenchmark.querySql       23           5000000           mmap     front-coded-4        force  avgt    5  11968.038 ± 125.990  ms/op
SqlBenchmark.querySql       23           5000000           mmap    front-coded-16        force  avgt    5  13985.780 ±  74.495  ms/op


-------

SELECT * FROM foo WHERE dimSequential IN ('1', '2', '3', '4', '5', '10', '11', '20', '21', '23', '40', '50', '64', '70', '100')

Benchmark              (query)  (rowsPerSegment)  (storageType)  (stringEncoding)  (vectorize)  Mode  Cnt      Score     Error  Units
SqlBenchmark.querySql       24           5000000           mmap              none        force  avgt    5    255.461 ±   2.556  ms/op
SqlBenchmark.querySql       24           5000000           mmap     front-coded-4        force  avgt    5    258.501 ±   2.187  ms/op
SqlBenchmark.querySql       24           5000000           mmap    front-coded-16        force  avgt    5    298.738 ±   2.915  ms/op


-------

SELECT * FROM foo WHERE dimSequential > '10' AND dimSequential < '8500'

Benchmark              (query)  (rowsPerSegment)  (storageType)  (stringEncoding)  (vectorize)  Mode  Cnt      Score     Error  Units
SqlBenchmark.querySql       25           5000000           mmap              none        force  avgt    5   9171.879 ±  25.495  ms/op
SqlBenchmark.querySql       25           5000000           mmap     front-coded-4        force  avgt    5   8819.167 ±  50.877  ms/op
SqlBenchmark.querySql       25           5000000           mmap    front-coded-16        force  avgt    5  10618.384 ± 124.553  ms/op


-------

SELECT dimSequential, dimZipf, SUM(sumLongSequential) FROM foo WHERE dimSequential IN ('1', '2', '3', '4', '5', '10', '11', '20', '21', '23', '40', '50', '64', '70', '100') GROUP BY 1, 2

Benchmark              (query)  (rowsPerSegment)  (storageType)  (stringEncoding)  (vectorize)  Mode  Cnt      Score     Error  Units
SqlBenchmark.querySql       26           5000000           mmap              none        false  avgt    5     22.775 ±   0.811  ms/op
SqlBenchmark.querySql       26           5000000           mmap              none        force  avgt    5     18.762 ±   0.654  ms/op
SqlBenchmark.querySql       26           5000000           mmap     front-coded-4        false  avgt    5     22.925 ±   0.772  ms/op
SqlBenchmark.querySql       26           5000000           mmap     front-coded-4        force  avgt    5     18.920 ±   0.610  ms/op
SqlBenchmark.querySql       26           5000000           mmap    front-coded-16        false  avgt    5     22.989 ±   0.813  ms/op
SqlBenchmark.querySql       26           5000000           mmap    front-coded-16        force  avgt    5     18.989 ±   0.571  ms/op


-------

SELECT dimSequential, dimZipf, SUM(sumLongSequential) FROM foo WHERE dimSequential > '10' AND dimSequential < '8500' GROUP BY 1, 2

Benchmark              (query)  (rowsPerSegment)  (storageType)  (stringEncoding)  (vectorize)  Mode  Cnt      Score     Error  Units
SqlBenchmark.querySql       27           5000000           mmap              none        false  avgt    5    377.893 ±  13.541  ms/op
SqlBenchmark.querySql       27           5000000           mmap              none        force  avgt    5    228.251 ±   2.652  ms/op
SqlBenchmark.querySql       27           5000000           mmap     front-coded-4        false  avgt    5    387.915 ±  11.770  ms/op
SqlBenchmark.querySql       27           5000000           mmap     front-coded-4        force  avgt    5    234.894 ±   2.848  ms/op
SqlBenchmark.querySql       27           5000000           mmap    front-coded-16        false  avgt    5    400.435 ±  10.410  ms/op
SqlBenchmark.querySql       27           5000000           mmap    front-coded-16        force  avgt    5    237.497 ±   2.143  ms/op

I have yet to perform larger scale testing on actual clusters with real workloads, but the benchmark results look very promising at this point and show very little overhead at query time, with a decent chance the reduced segment sizes will more than make up for it.

Design

The encoding itself is done within a new Indexed implementation, FrontCodedIndexed, which contains all of the methods for reading and writing the buckets of values. I adapted 'variable byte' encoding for integer values from JavaFastPFOR to write both string value lengths as well as prefix lengths.

string layout:

length value
vbyte int byte[]

bucket layout:

first string prefix length string fragment ... prefix length string fragment
byte[] vbyte int byte[] ... vbyte int byte[]

front coded indexed layout:

version bucket size has null? number of values size of "offsets" + "buckets" "offsets" "buckets"
byte byte byte vbyte int vbyte int int[] bucket[]

Note that the "offsets" store the starting location of all buckets beyond the first bucket (whose offset is known to be the end of the "offsets" position). The "offsets" are stored as plain integer values instead of vbyte encoded to allow for fast access of the bucket positions, but are probably a good candidate for delta encoded byte packing to further decrease their size.

Using it

This functionality can be utilized by a new property to IndexSpec, stringDictionaryEncoding, which can be set to {"type":"frontCoded", "bucketSize": 4} or {"type":"frontCoded", "bucketSize": 16} or something similar, to instruct indexing tasks to write segments with the compressed dictionaries with bucket size 4 or 16 respectively. ({"type":"utf8"} is the default).

This mode is not set it as the default yet because any segments written like this will be unreadable by older versions of Druid, so care must be taken before migrating to this encoding. Additionally, this needs a lot more testing and measurement to ensure that it is genuinely better in most cases before making it the default, but it looks pretty promising.

Testing

Besides the direct tests on FrontCodedIndexed and VByte, I also wired a front-coded segment into both BaseFilterTest and QueryTestHelper, which provides a rather wide set of test coverage for a variety of scenarios. This process found a number of bugs in my initial commits, so I feel reasonably confident that things are correct at this point.

Future work

Before I started coding on this, in addition to the paper linked in #3922, https://arxiv.org/pdf/1101.5506.pdf, I also read through https://arxiv.org/pdf/1911.08372.pdf which was a newer paper by one of the authors on the first link, and also stumbled upon https://link.springer.com/content/pdf/10.1007/s00778-020-00620-x.pdf, all of which detail additional (much fancier) improvements which can be made to this strategy by further coding the string values (https://en.wikipedia.org/wiki/Re-Pair seems the primary focus in these papers). It would probably be worth investigating to determine at what cost additional size improvements can be gained.

Additionally, it seems to be ideal to be able to vary which encoding is used per column instead of setting it at the segment level (this seems true of other types of compression as well...). This could allow collection of statistic at indexing time to determine how likely this encoding is to be useful, such as minimum value cardinality thresholds or similar (akin to the 'auto' encoding available for long columns).


Key changed/added classes in this PR
  • VByte
  • FrontCodedIndexed
  • FrontCodedIndexedWriter
  • StringDimensionMergerV9
  • DictionaryEncodedColumnPartSerde
  • DictionaryEncodedStringColumnIndexSupplier (internal classes split out into several new files)
  • StringFrontCodedDictionaryEncodedColumn
  • StringFrontCodedDictionaryEncodedColumnSupplier
  • StringFrontCodedColumnIndexSupplier
  • NestedDataColumnMerger
  • NestedDataColumnSupplier
  • CompressedNestedDataComplexColumn
  • NestedFieldLiteralColumnIndexSupplier
  • NestedFieldLiteralDictionaryEncodedColumn

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • been tested in a test Druid cluster.

@lgtm-com
Copy link

lgtm-com bot commented Feb 23, 2022

This pull request introduces 2 alerts when merging aadf6cd into b1640a7 - view on LGTM.com

new alerts:

  • 2 for Subtle call to inherited method

@clintropolis
Copy link
Member Author

Some additional less scientific measurements, using a 10GB file of the nyc taxi dataset with all of the columns stored as strings:
Screen Shot 2022-02-27 at 6 21 58 PM

grouping performance seems competitive:
Screen Shot 2022-02-28 at 1 26 06 PM
Screen Shot 2022-02-28 at 1 25 51 PM

select * does show a performance decrease as the earlier benchmarks suggested:
Screen Shot 2022-02-27 at 6 28 15 PM
Screen Shot 2022-02-27 at 6 27 41 PM

I still haven't had the chance to spend any time optimizing the code, but the size savings definitely make this feel worth considering for clusters where the typical workload does not include queries which hit a lot of columns like "wide" scans ("select *", etc) or group bys or things that hit a large number of columns.

@clintropolis
Copy link
Member Author

there aren't any explicit conflicts, but this PR needs updated after #12315 to implement the new methods, i'll try to see if I can consolidate some of the code a bit better when I fix it up

@lgtm-com
Copy link

lgtm-com bot commented Sep 21, 2022

This pull request introduces 1 alert when merging 28cf810 into 331e6d7 - view on LGTM.com

new alerts:

  • 1 for Inconsistent equals and hashCode

@clintropolis
Copy link
Member Author

I ran a few of the SqlNestedDataBenchmarks that were doing string stuff just to spot check and things look good there too:

SELECT string1, SUM(long1) FROM foo GROUP BY 1 ORDER BY 2
SELECT JSON_VALUE(nested, '$.nesteder.string1'), SUM(JSON_VALUE(nested, '$.long1' RETURNING BIGINT)) FROM foo GROUP BY 1 ORDER BY 2

Benchmark                        (query)  (rowsPerSegment)  (stringEncoding)  (vectorize)  Mode  Cnt    Score   Error  Units
SqlNestedDataBenchmark.querySql        6           5000000              none        false  avgt    5  229.141 ± 4.357  ms/op
SqlNestedDataBenchmark.querySql        6           5000000              none        force  avgt    5  158.286 ± 1.982  ms/op
SqlNestedDataBenchmark.querySql        6           5000000     front-coded-4        false  avgt    5  226.019 ± 2.990  ms/op
SqlNestedDataBenchmark.querySql        6           5000000     front-coded-4        force  avgt    5  154.666 ± 0.682  ms/op
SqlNestedDataBenchmark.querySql        6           5000000    front-coded-16        false  avgt    5  218.805 ± 3.507  ms/op
SqlNestedDataBenchmark.querySql        6           5000000    front-coded-16        force  avgt    5  159.220 ± 9.396  ms/op
SqlNestedDataBenchmark.querySql        7           5000000              none        false  avgt    5  379.591 ± 6.253  ms/op
SqlNestedDataBenchmark.querySql        7           5000000              none        force  avgt    5  196.781 ± 3.562  ms/op
SqlNestedDataBenchmark.querySql        7           5000000     front-coded-4        false  avgt    5  369.041 ± 4.383  ms/op
SqlNestedDataBenchmark.querySql        7           5000000     front-coded-4        force  avgt    5  197.589 ± 3.049  ms/op
SqlNestedDataBenchmark.querySql        7           5000000    front-coded-16        false  avgt    5  379.980 ± 2.840  ms/op
SqlNestedDataBenchmark.querySql        7           5000000    front-coded-16        force  avgt    5  198.248 ± 4.503  ms/op

SELECT SUM(long1) FROM foo WHERE string1 = '10000' OR string1 = '1000'
SELECT SUM(JSON_VALUE(nested, '$.long1' RETURNING BIGINT)) FROM foo WHERE JSON_VALUE(nested, '$.nesteder.string1') = '10000' OR JSON_VALUE(nested, '$.nesteder.string1') = '1000'

Benchmark                        (query)  (rowsPerSegment)  (stringEncoding)  (vectorize)  Mode  Cnt    Score   Error  Units
SqlNestedDataBenchmark.querySql       10           5000000              none        false  avgt    5   11.487 ± 0.236  ms/op
SqlNestedDataBenchmark.querySql       10           5000000              none        force  avgt    5   11.472 ± 0.201  ms/op
SqlNestedDataBenchmark.querySql       10           5000000     front-coded-4        false  avgt    5   11.509 ± 0.198  ms/op
SqlNestedDataBenchmark.querySql       10           5000000     front-coded-4        force  avgt    5   11.510 ± 0.297  ms/op
SqlNestedDataBenchmark.querySql       10           5000000    front-coded-16        false  avgt    5   11.480 ± 0.288  ms/op
SqlNestedDataBenchmark.querySql       10           5000000    front-coded-16        force  avgt    5   11.458 ± 0.270  ms/op
SqlNestedDataBenchmark.querySql       11           5000000              none        false  avgt    5   11.650 ± 0.274  ms/op
SqlNestedDataBenchmark.querySql       11           5000000              none        force  avgt    5   11.674 ± 0.254  ms/op
SqlNestedDataBenchmark.querySql       11           5000000     front-coded-4        false  avgt    5   11.681 ± 0.312  ms/op
SqlNestedDataBenchmark.querySql       11           5000000     front-coded-4        force  avgt    5   11.672 ± 0.340  ms/op
SqlNestedDataBenchmark.querySql       11           5000000    front-coded-16        false  avgt    5   11.792 ± 0.383  ms/op
SqlNestedDataBenchmark.querySql       11           5000000    front-coded-16        force  avgt    5   11.809 ± 0.422  ms/op


SELECT long1, SUM(double3) FROM foo WHERE string1 = '10000' OR string1 = '1000' GROUP BY 1 ORDER BY 2
SELECT JSON_VALUE(nested, '$.long1' RETURNING BIGINT), SUM(JSON_VALUE(nested, '$.nesteder.double3' RETURNING DOUBLE)) FROM foo WHERE JSON_VALUE(nested, '$.nesteder.string1') = '10000' OR JSON_VALUE(nested, '$.nesteder.string1') = '1000' GROUP BY 1 ORDER BY 2

Benchmark                        (query)  (rowsPerSegment)  (stringEncoding)  (vectorize)  Mode  Cnt    Score   Error  Units
SqlNestedDataBenchmark.querySql       16           5000000              none        false  avgt    5  126.009 ± 1.829  ms/op
SqlNestedDataBenchmark.querySql       16           5000000              none        force  avgt    5  125.930 ± 2.802  ms/op
SqlNestedDataBenchmark.querySql       16           5000000     front-coded-4        false  avgt    5  125.991 ± 1.981  ms/op
SqlNestedDataBenchmark.querySql       16           5000000     front-coded-4        force  avgt    5  126.098 ± 4.202  ms/op
SqlNestedDataBenchmark.querySql       16           5000000    front-coded-16        false  avgt    5  125.795 ± 6.560  ms/op
SqlNestedDataBenchmark.querySql       16           5000000    front-coded-16        force  avgt    5  126.172 ± 3.807  ms/op
SqlNestedDataBenchmark.querySql       17           5000000              none        false  avgt    5  126.375 ± 2.382  ms/op
SqlNestedDataBenchmark.querySql       17           5000000              none        force  avgt    5  125.585 ± 0.396  ms/op
SqlNestedDataBenchmark.querySql       17           5000000     front-coded-4        false  avgt    5  125.678 ± 2.668  ms/op
SqlNestedDataBenchmark.querySql       17           5000000     front-coded-4        force  avgt    5  125.355 ± 2.104  ms/op
SqlNestedDataBenchmark.querySql       17           5000000    front-coded-16        false  avgt    5  127.011 ± 5.057  ms/op
SqlNestedDataBenchmark.querySql       17           5000000    front-coded-16        force  avgt    5  126.835 ± 3.172  ms/op

Segment sizes for benchmark are ~3.4G instead of 3.6G, but similar issue with the SqlBenchmark using the data generator in that the data is all stringified numbers so limited in ability to benefit from this encoding.

Copy link
Contributor

@imply-cheddar imply-cheddar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few questions/comments here and there, but nothing that should block this. If there's one thing we should do, it's probably still having an encoding for the GenericIndexed so that deployment of this code will give us a rollback point for when we stop persisting the columns that borrow the GenericIndexed version.


private ImmutableBitmap getBitmapForValue()
{
final ByteBuffer valueUtf8 = value == null ? null : ByteBuffer.wrap(StringUtils.toUtf8(value));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we ever call .forValue(String) and then throw away the result? If not, it should be safe to move this to line 69 and then close over it instead. That way we don't convert to utf8 bytes multiple times (right now, it will happen on every call to estimateSelectivity and computeBitmapResult)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the only place we seem to truly throw it away is in QueryableIndexStorageAdapater.canVectorize, which calls Filter.getBitmapColumnIndex (which for SelectorFilter and LikeFilter ends up calling forValue) to see if the filter supports bitmap indexes (which means it can be vectorized even if the value matcher hasn't been vectorized), and a place in the native 'search' query that is doing a similar check in the UseIndexesStrategy - though for this one it could probably be re-used, since its making ColumnIndexSelector twice and calling Filter.getBitmapColumnIndex twice (this one should be fixed, but I don't really want to do in this PR, maybe later when I move index supplier to BaseColumn).

In most cases I think computeBitmapResult is only going to be called once when building a cursor, and estimateSelectivity is only used by search queries, so i'm not sure it makes a huge difference either way?

Comment on lines 239 to 240
// Note: we can rely on indexOf returning (-(insertion point) - 1), even though Indexed doesn't
// guarantee it, because "dictionary" comes from GenericIndexed singleThreaded().
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I question if this comment is still accurate.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the (-(insertion point) - 1) thing needs to be true, though the GenericIndexed part is no longer strictly accurate since it can also come from FrontCodedIndexed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you modify the comment to be accurate?


public class EncodedStringDictionaryWriter implements DictionaryWriter<String>
{
public static final byte VERSION = Byte.MAX_VALUE; // hopefully GenericIndexed never makes a version this high...
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does it matter? Even if GenericIndexed does, the thing that matters is that the version of the column itself never collides. In order to ensure that, we should really start persisting all of the columns with the newly incremented version (even if they are still set to persist with GenericIndexed), but the downside to that is that it doesn't allow for rollback. So, instead the new version can hopefully still persist a GenericIndexed version, we continue to make things with the old version in case we are in a state where we might want rollback and then in some future version, we should just never persist a String column without adding some String-column-specific version id.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, I did not actually introduce a new version of the whole string column format in this PR, instead opting to just carve out the ability to customize how we write and read the value dictionary, and re-using everything else in the format.

To update the actual string column version, I think since version is also currently loaded with whether or not the int column part is compressed, I would either need to:

  • add two versions for compressed/uncompressed
  • make more dramatic modifications to the column part serde so that compression is not stored in the version byte
  • or just write a totally new string column part serde for v4 to cleanup some of this stuff (this might be best actually, but I'm unsure I want to do as part of this PR)

When not using front-coding it was important to make sure we keep writing columns exactly the same so that clusters can roll back to older versions and still read their segments, so I was trying to minimize the amount of code needed to support this.

The way I've done it here (assuming I actually add the code to handle StringEncodingStrategy.UTF8_ID as a GenericIndexed per your other comment), then at some point in the future we could just start always writing EncodedStringDictionaryWriter.VERSION before the encoding id, and get by without having to change the actual whole column version, though we have to spend that byte which would effectively become static until we do.

Though writing all of this out, I'm thinking a bit longer term its probably best to make a truly new version of the string column to try to clean some stuff up like decoupling column version from compression, etc, but I'd like to think a bit more about what else might be missing (index stuff in particular seems like could use some thought since we've opened up a lot of future possibilities with our recent refactoring in the area).

}

// if the bucket buffer is full, write the bucket
if (numWritten > 0 && (numWritten % bucketSize) == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know that it really matters, but it looks like you are buffering a bucket and then writing the whole bucket out. I don't think that's strictly necessary, is it? That is, I think we can do a completely streaming write? Or is there some state that you need to have which you can only get once the bucket is full? I think even the offset written into the header could be figured out in a streaming fashion?

I don't think it really matters, so I'm not actually asking you to change this. Just wanting to validate my understanding.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're right its not strictly necessary, if I added versions of VByte.writeInt that accepted WriteOutBytes (current method uses ByteBuffer) then I think it would work without too much trouble, would just need to track first bucket value so can do the prefixing on the fly and track number of bytes written to write bucket offsets. I think the shape that this ended up in grew out of my experiments, where it was easier for me to write tests to make sure stuff was working by being able to write to then read from bytebuffers. I'll consider playing with this in the future to try to simplify things.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do we pick the optimal bucket "key frames"? It looks like we write buckets until they are full. Doesn't this put us at the mercy of the data distribution rather than trying to find (near) optimal key values? That is, in the worst case, we could have a list of "a", "b", "ba", "bab", "bob", ... and our bucket would start with "a" and there would be no compression. If we instead make a 1-value bucket, then started the next with "b", w'd save 3 bytes from the following values.

The papers linked in the PR suggest a sampling algorithm, but it does appear to make multiple passes over the data, and thus can't be done streaming. Or, is this one of the future enhancements mentioned in the description? If it is, does this encoding support variable-sized buckets? We'd still have n buckets, but sizes would vary around the (value count / n) mean.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation only supports fixed bucket sizes because it makes finding the value at an arbitrary index a lot easier, we can do some math to know which bucket any given index is contained in.

In query processing, the get method can be used in an effectively random access manner - like in a dictionary encoded string column, the 'column' is stored as integers of the dictionary ids, and then as late as possible, these values are looked up to replace with the dictionary id with the actual value.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, I definitely think there is room to explore even when constrained to fixed bucket sizes, for example i'm currently prefixing all values based on the first bucket value, but since everything is effectively run length encoded within a bucket anyway, I could imagine with a slight bit more complication the prefix could be relative to the value immediately preceding it in the bucket instead of the first value of the bucket.

I haven't explored this yet, but it would somewhat help the scenario you describe, and is probably worth it if it doesn't cost too much performance.

Comment on lines 141 to 146
return Byte.BYTES +
Byte.BYTES +
Byte.BYTES +
VByte.estimateIntSize(numWritten) +
VByte.estimateIntSize(headerAndValues) +
headerAndValues;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get scared of estimates of size, they are wrong sometimes. You could alternatively build the header bytes (the 3 bytes and 2 VBytes) and then look at how many bytes it used to get at the size of the header, yes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, maybe I didn't name this method very well since it produces the exact size, what you're saying would work too though

if (encodingId == StringEncodingStrategy.FRONT_CODED_ID) {
readFrontCodedColumn(buffer, builder, rVersion, rFlags, hasMultipleValues);
} else {
throw new ISE("impossible, unknown encoding strategy id: %s", encodingId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should likely also support an ID for the GenericIndexed based approach so that in a future release we can protect from GenericIndexed getting a new version number by persisting those columns as just another encoding here.

Copy link
Contributor

@gianm gianm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! No real serious comments. A lot of requests for javadocs and such 🙂

The testing strategy looks good. Adding it to the query tests & filter tests, as well as having a front-coding-specific test, is a good idea.

@@ -108,6 +108,15 @@ public static String fromUtf8(final ByteBuffer buffer)
return StringUtils.fromUtf8(buffer, buffer.remaining());
}

@Nullable
public static String fromUtf8Nullable(@Nullable final ByteBuffer buffer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Javadoc should specify whether the buffer position is advanced, and by how much.

public class VByte
{
/**
* Read a variable byte (vbyte) encoded integer from a {@link ByteBuffer} at the current position.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Javadoc should specify whether the buffer position is advanced, and by how much.

}

/**
* Write a variable byte (vbyte) encoded integer to a {@link ByteBuffer} at the current position.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Javadoc should specify whether the buffer position is advanced, and by how much.

@@ -384,7 +387,10 @@ public void writeIndexes(@Nullable List<IntBuffer> segmentRowNumConversions) thr
}
}


protected DictionaryWriter<T> getWriter(String fileName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, a good compromise here would be to add javadocs to this method explaining the contract for overriding it, at least noting when it gets called.

Also: makeDictionaryWriter seems like a better name than getWriter. It emphasizes that the method creates something, not fetches something. And that it's about the dictionary. (There's other writers beyond the dictionary writer.)

@@ -371,7 +372,8 @@ public static <T, QueryType extends Query<T>> List<QueryRunner<T>> makeQueryRunn
new QueryableIndexSegment(noRollupMMappedTestIndex, SEGMENT_ID),
"noRollupMMappedTestIndex"
),
makeQueryRunner(factory, new QueryableIndexSegment(mergedRealtimeIndex, SEGMENT_ID), "mergedRealtimeIndex")
makeQueryRunner(factory, new QueryableIndexSegment(mergedRealtimeIndex, SEGMENT_ID), "mergedRealtimeIndex"),
makeQueryRunner(factory, new QueryableIndexSegment(frontCodedMappedTestIndex, SEGMENT_ID), "frontCodedMappedTestIndex")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

frontCodedMMappedTestIndex (spelling)

import javax.annotation.Nullable;
import java.util.Objects;

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = StringEncodingStrategy.Utf8.class)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to have a defaultImpl? I always try to avoid it whenever possible, because it has this weird behavior where an unregistered type gets assigned to the default impl. It means that typos give you the default impl silently, which trips people up.

}
}

@JsonTypeName(FRONT_CODED)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, I don't think we need this and @JsonSubTypes.Type.

* dictionary, which 'delta encodes' strings (instead of {@link org.apache.druid.segment.data.GenericIndexed} like
* {@link StringDictionaryEncodedColumn}).
*
* This class is otherwise nearly identical to {@link StringDictionaryEncodedColumn} other than the dictionary
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is a pretty big class to have duplicated. It would be good to do the follow-up work to figure out how to consolidate them.

For now, can you add a comment to StringDictionaryEncodedColumn that reminds people to make any changes here too?

{
final int adjustedIndex;
// due to vbyte encoding, the null value is not actually stored in the bucket (no negative values), so we adjust
// the index
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that @imply-cheddar means this:

  • null is always the value for dictionary id 0
  • the value for dictionary id N (where N > 0) is position N - 1 in the dictionary

Seems fine, although in the interest of being able to share more code between the front-coded dictionary and legacy dictionary impl, it's ok to keep it the way you have it in the patch. It's closer to the legacy impl.

}

@Override
public int indexOf(@Nullable ByteBuffer value)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Javadoc should explicitly tighten the contract such that this returns (-(insertion point) - 1) on no match.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, i noticed this too. Part of the problem is that classically this isn't strictly required in all uses of Indexed.indexOf but looking around there is only 1 usage/implementation which couldn't implement this contract, in https://github.com/apache/druid/blob/master/processing/src/main/java/org/apache/druid/segment/DictionaryEncodedColumnMerger.java#L148 which is using it to determine if the dictionary contains a null value when merging, which when backed by the string indexer is getting a -1 for any value that isn't in the dictionary, https://github.com/apache/druid/blob/master/processing/src/main/java/org/apache/druid/segment/DictionaryEncodedColumnIndexer.java#L96.

This could usage probably be replaced with a method to indicate if the value dictionary contains null and we could strengthen the contract of indexOf since all remaining users would be related to column value dictionary stuff that requires it. I wonder if we should also consider adding a method to check that the Indexed is sorted, since that indexOf contract only really makes sense for finding endpoints for value ranges using the same comparator as the dictionary is sorted.

I considered making a ValueDictionary interface with a stronger contract around this behavior, but @cheddar pushed back asserting we already have too many interfaces which .. is fair.

Thoughts on pushing the extra methods into Indexed so that the contract can be strengthened to (-(insertion point) - 1) if isSorted() returns true or something and hasNull to allow the string merger to do the check it needs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up adding isSorted which when true strengthens the contract of indexOf to be (-(insertion point) - 1). GenericIndexed (and BufferedIndexed produced by its singleThreaded), FixedIndexed, and FrontCodedIndexed all implement it.

Copy link
Contributor

@paul-rogers paul-rogers left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@clintropolis, this is an impressive bit of work! Looks like @imply-cheddar provided comments about the integration of this new index into Druid: I don't have the context to comment on that. Instead, I focused on the implementation of the algorithm and its encoding. I left a few questions, probably mostly reflecting my own ignorance of the patterns used in this area of the code.

* https://github.com/lemire/JavaFastPFOR/blob/master/src/main/java/me/lemire/integercompression/VariableByte.java
*
*/
public static int readInt(ByteBuffer buffer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The garbage issue is a valid one: low level code should minimize garbage. I would seem a bug if we create many ByteBuffers. There should be one per column (or buffer), not one per column value. If that is true, then the number should be reasonable.

@clintropolis suggests that, to take a position as input, we have to return a length for variable-length fields. This generates garbage per-call as we create, then discard the required Pair and Integer objects. One could pass an AtomicInteger, but that seems fiddly.

This implementation seems the correct one. If we have a "too many ByteBuffer problem, perhaps we should fix that instead.

public final class IndexedUtf8ValueSetIndex<TDictionary extends Indexed<ByteBuffer>>
implements StringValueSetIndex, Utf8ValueSetIndex
{
// This determines the cut-off point to swtich the merging algorithm from doing binary-search per element in the value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

swtich -> switch

}

/**
* Adapter to convert {@link Indexed<ByteBuffer>} with utf8 encoded bytes into {@link Indexed<String>} to be frinedly
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

frinedly -> friendly

final int numRows = offset.getCurrentVectorSize();

for (int i = 0; i < numRows; i++) {
// Must use getUnshared, otherwise all elements in the vector could be the same shared object.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would that be a bad thing? Are these objects mutated?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this code is more or less copied from StringDictionaryEncodedColumn with minor adjustments to use different value dictionary, but yeah the thing returned by get can be mutated.

Specifically, https://github.com/apache/druid/blob/master/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarMultiIntsSupplier.java#L177, which was optimized towards a non-vectorized hot loop use case where callers are calling get, immediately extracting/processing/etc the int values of the returned IndexedInts, but don't hold onto IndexedInt after calling get for the next row.

This method is to ensure that its cool to hold onto IndexedInts after get is called for some other row.

{
final int adjustedIndex;
// due to vbyte encoding, the null value is not actually stored in the bucket (no negative values), so we adjust
// the index
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The presence of a null value seemed to be used elsewhere to tell is whether the column contains nulls (if I understood correctly.) This suggests we need a null entry only if there are, in fact, null values. In SQL, NULL and "" are distinct things. There is, unfortunately, no string shorter than length 0. However, if we encode the entry as (length, bytes), can we use a length of all 1s (i.e. -1) to indicate a null value which would be present only if the column contains at least one null (in SQL-speak, if the column is nullable?)

final int adjustIndex = hasNull ? 1 : 0;
final int div = Integer.numberOfTrailingZeros(bucketSize);
final int rem = bucketSize - 1;
return () -> new FrontCodedIndexed(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: rather than making FrontCodedIndex dumb, and this Supplier smart, does it make sense to pass in the parameters into the constructor and let it do the math? Else, we'd have to check all the places that the ctor is used to ensure that they did the same math. (And, if it is used only once, the ctor is trusting an external source to do its math for it...)

Maybe there are two sources: write and read? If so, can the common stuff be in the ctor? Or, provide two ctors, one for each case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, probably could push a lot of this computation into the constructor - numBuckets, lastBucketNumValues, adjustedNumValues, adjustIndex, div, and rem are all computed, though i didn't think it was that big deal either since it is a private constructor and only called via the static create method that has far fewer arguments, so there shouldn't really be any other callers, and while very very minor, the way it is here does avoid recomputing these values every time the supplier creates a new FrontCodedIndexed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After thinking a bit more about it, I ended up pushing the computations into the constructor. While we do have to repeat them, it does have the benefit of making the supplier smaller when the segment is at rest, which makes up for it i think.

Copy link
Member Author

@clintropolis clintropolis Oct 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on the topic of heap usage, this actually adds up quite a bit, the same wikipedia segment takes 36kb on heap instead of 49kb by using this lightweight supplier instead of GenericIndexed:
Screen Shot 2022-10-25 at 4 08 04 AM

I did not compare before and after pushing the computation into the constructor, and don't expect it would have been nearly as heavy as GenericIndexed, but still seems worth it since its like 1/3 as many parameters the supplier needs to hold onto to do its thing


// we also compare against the adjacent bucket to determine if the value is actually in this bucket or
// if we need to keep searching buckets
final int nextOffset = getBucketOffset(currentBucket + 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check if we're already on the last bucket to prevent read-past-end-buffer?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that wouldn't happen since maxBucketIndex has to be greater than minBucketIndex otherwise code won't enter this loop. and currentBucketIndex would come out to be less than maxBucketIndex, based on the calculation.

*
* This method modifies the position of the buffer.
*/
private static ByteBuffer[] readBucket(ByteBuffer bucket, int numValues)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit picky: do we need a ByteBuffer for each value, or just a byte[]? That is, do we then do anything fancy with the values or just treat them as a block of bytes? Using an array halves the garbage that this method creates.

Or, if the primary use of the values is as Strings, should this return a String to avoid a conversion later?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Downstream from this, currently DimensionDictionarySelector provides two methods to translate dictionary ids into the values, one which deals in ByteBuffer to provide the raw values, and another that can provide the String values for stuff that actually needs it. (Some future adjustments need made to these interfaces to support other kinds of dictionary encoded columns, or bytebuffer based method needs utf8 removed from its name to be less opinionated about its contents, but thats a discussion for another time).

Anyway, if we returned byte[] here we would still have to wrap them in a ByteBuffer to satisfy the contract of the downstream stuff when making selectors for the column.

}

// if the bucket buffer is full, write the bucket
if (numWritten > 0 && (numWritten % bucketSize) == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do we pick the optimal bucket "key frames"? It looks like we write buckets until they are full. Doesn't this put us at the mercy of the data distribution rather than trying to find (near) optimal key values? That is, in the worst case, we could have a list of "a", "b", "ba", "bab", "bob", ... and our bucket would start with "a" and there would be no compression. If we instead make a 1-value bucket, then started the next with "b", w'd save 3 bytes from the following values.

The papers linked in the PR suggest a sampling algorithm, but it does appear to make multiple passes over the data, and thus can't be done streaming. Or, is this one of the future enhancements mentioned in the description? If it is, does this encoding support variable-sized buckets? We'd still have n buckets, but sizes would vary around the (value count / n) mean.


@Nullable
@Override
public byte[] get(int index) throws IOException
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One wonders what a get is doing in a writer...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, its a bit strange, the primary usage is when something needs to read what it has written out before it is written to a permanent place, such as https://github.com/apache/druid/blob/master/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java#L166, which is merging spatial indexes during string column merging.

Copy link
Contributor

@abhishekagarwal87 abhishekagarwal87 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work @clintropolis. Some minor comments from my side. I didn't do a thorough review, just enough to build my understanding of the feature.

@Override
public Iterable<ImmutableBitmap> getBitmapIterable()
{
return () -> new Iterator<ImmutableBitmap>()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The iterator impl could go into a private class for better readability.

Comment on lines +77 to +82
if (!nextSet) {
findNext();
if (!nextSet) {
throw new NoSuchElementException();
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (!nextSet) {
findNext();
if (!nextSet) {
throw new NoSuchElementException();
}
}
if (!hasNext()) {
throw new NoSuchElementException();
}

Comment on lines +95 to +96
dictionary = GenericIndexed.read(stringDictionaryBuffer, GenericIndexed.BYTE_BUFFER_STRATEGY, mapper);
frontCodedDictionary = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the answer is probably somewhere in the code but I didn't find it. why is frontCodedDictionary a supplier when dictionary is not?

Copy link
Member Author

@clintropolis clintropolis Oct 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good question. So I decided to just make it a Supplier for users of FrontCodedIndexed so that I don't have to do something like what nearly all of the actual users of GenericIndexed are doing, which is call to singleThreaded() on the top level column GenericIndexed dictionary to get an optimized version that isn't thread-safe for use in a single thread so that it can create less garbage. By using a supplier I can avoid all this since all threads just get their own copy (which is basically what all callers are doing anyway).

* are not present).
*
* The value iterator reads an entire bucket at a time, reconstructing the values into an array to iterate within the
* bucket before moving onto the next bucket as the iterator is consumed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copy the layout description from the PR description into Javadoc here.

+1. GenericIndexed has a similar description.

next = dictionary.indexOf(nextValue);

if (next == -dictionarySize - 1) {
// nextValue is past the end of the dictionary.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// nextValue is past the end of the dictionary.
// nextValue is past the end of the dictionary and we can break early instead of going through all values in iterator

I know that it might be obvious but took me a bit of time to connect the dots :)


// we also compare against the adjacent bucket to determine if the value is actually in this bucket or
// if we need to keep searching buckets
final int nextOffset = getBucketOffset(currentBucket + 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that wouldn't happen since maxBucketIndex has to be greater than minBucketIndex otherwise code won't enter this loop. and currentBucketIndex would come out to be less than maxBucketIndex, based on the calculation.

@abhishekagarwal87
Copy link
Contributor

abhishekagarwal87 commented Oct 23, 2022 via email

Copy link
Contributor

@gianm gianm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM after the latest set of changes. Nice work!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Incremental encoding for GenericIndexed<String>
6 participants