Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Druid 'Shapeshifting' Columns #6016

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

clintropolis
Copy link
Member

@clintropolis clintropolis commented Jul 18, 2018

This PR introduces a new approach to representing numerical columns in which row values are split into varyingly encoded blocks, dubbed shape-shifting columns for obvious, druid reasons. In other words, we are trading a bit extra compute and memory at indexing time to analyze each block of values and find the most appropriate encoding, in exchange for potential increased decoding speed and decreased encoded size on the query side. The end result is hopefully faster performance in most cases.

Included in this PR is the base generic structure for creating and reading ShapeShiftingColumn as well as an implementation for the integer column part of dictionary encoded columns, ShapeShiftingColumnarInts. I have an additional branch in development with an implementation for long columns which was done as a reference to make sure the generic base structure is workable and will follow on the tail of this PR, as well as some experiments I would like to do with floats/doubles, so they hopefully won't be far behind either. For ints at least, the result is something that in many cases will decode as fast or faster and be as small or smaller CompressedVSizeColumnarInts with lz4.

tl;dr

The good

This PR can likely make your queries faster, and maybe, your data smaller. Based on observations thus far, I would advocate making this the default in whatever major version + 1 this PR makes it into (if it continues to perform well in the wild of course), but the intention for now is to be an optional/experimental feature specified at indexing time. You will mainly see two types of plots generated from benchmarks in this writeup comparing ShapeShiftingColumnarInts to CompressedVSizeColumnarInts trying to convince you of this, one of which is a summary of encoded sizes by column and for the whole segment:

wiki-2-size-summary

and the other which has a line chart of 'time to select n rows' where 'time to select' is the y axis and 'n' as a fraction of total rows in the column is the x axis:

wiki-2-select-speed

as well as 2 bar charts, one showing encoded size comparison, the other encoding speed comparison. Most of these are animations which show each column. The dip at the end is actually a cliff caused by the benchmark iterating the column without a simulated filter, to get raw total scan speed for the entire column. Beyond benchmarks, there is a bunch of design description and background of how things ended up how they are, to hopefully make review easier.

The bad

This isn't currently better 100% of the time, specifically in the case of low cardinality dimensions that have a semi-random distribution without a lot of repeated values. The reason shape-shifting columns perform worse in this case is due to fixed using block sizes that are decided prior to indexing, where CompressedVSizeColumnarIntsSerializer byte packs on the fly until a fixed size buffer is full, effectively allowing it to vary block size to have up to 4 times as many values per block as the current largest block size setting provided for the shape-shifting serializer, allowing lz4 to take advantage of having more values to work with per run. There is nothing preventing the use of varying sized blocks for shape-shifting columns, so this could likely be overcome in one way or another. I am just currently erring on the side of caution to attempt to limit heap pressure that this PR already introduces where there previously was none, which makes me nervous but has seemed ok so far while running on a test cluster. Additionally, because these columns have a different memory profile than CompressedVSizeColumnarInts for both indexing and query, jvm configuration adjustments will likely need to be made, especially if your cluster is tuned to effectively use most of it's resources. See the 'Memory Footprint' section of the design section for more details. Overall indexing time might also change, sometimes faster, sometimes slower, encode speed is dependent on value composition so can range quite a lot between columns.

Design

On the query side, ShapeShiftingColumn implementations are designed to implement the actual logic to get values for row indices within the column implementation, which through meticulous benchmarking has proven to be the maximally performant arrangement I could find and do the things we are doing here. This is an alternative approach to pushing down reads into a decoder type, as is done in the BlockLayout... EntireLayout... implementations for long columns. To put row reads in the column, a ShapeShiftingColumn implementation operates on a slightly unintuitive pattern, that for each block it will pass itself into a decoder which knows how to mutate things the column such that the column's 'get' function will produce the correct data for the row index. Vague, I know, but we'll get there. Sticking with the theme, these phase changes between the chunks of the data while reading are called transformations. This design is a bit less straightforward than I was hoping to end up with, but this inversion of responsibility allows the implementation beat the performance penalty that faces the BlockLayout... EntireLayout... with EncodingReader pattern which is vaguely alluded to here. More on this later (and pictures).

Background

FastPFOR

The works produced in this PR actually began life as a follow up investigation to using the FastPFOR algorithm to encode integer value columns, writing additional benchmarks to expand upon the random valued columns tested in that thread to use the column value generators to compare FastPFOR against CompressedVSizeColumnarInts across a wide variety of column value distributions. Things looked really good in early tests, FastPFOR was doing very well, often outperforming the lz4 bytepacking across the board on encoding speed, encoded size, and decoding speed.

generator-fastpfor-select-speed

From there, benchmarks were expanded to test against actual integer columns from actual segments, where FastPFOR was initially looking even better as a standalone option.

wiki-2-all-fastpfor-size-summary

wiki-2-fastpfor-select-speed

However, once I began looking at one of our 'metrics-like' data sets, where the encoded size using FastPFOR was on the order of double the size of using the existing lz4 bytepacking.

clarity-1-fastpfor-size-summary

Examining more metrics datasources produced similar results. Many columns decoded faster, but the increased size cost was too much to be something that we could consider making the default, greatly reducing the utility of this feature if it were to require cluster operators to experiment with segments by hand to compare FastPFOR and lz4 bytepacking to see which performed better with their columns. Note that this is unrelated to the previous mentioned block size issue, increased block size had little effect on overall encoded size, it's more a case of being a data distribution which fastpfor overall does pretty poorly with compared to lz4.

Thinking that the sparse nature of many metrics datasets might be the cause, I made a 'RunFastPFOR' version of the column that eliminated blocks that were entirely zeros or a constant value by putting a byte header in each chunk to indicate if a block was zero, constant, or FastPFOR encoded, which helped a bit but still couldn't compete with the existing implementation. Noting that encoding speed for FastPFOR being so much faster than lz4, we considered a system that would try both and use whichever was better (similar to the intermediary column approach with longs), and also looked into introducing a run length encoding into the mix to see how much repeated values played a role in how well lz4 was doing comparatively. Searching for some reading about run length encoding, I came across some Apache ORC encoding docs (the original link is dead, but most of the content is contained here) which upon reading, caused something to click, realizing my earlier 'RunFastPFOR' sketch was maybe on to something, but to try all encodings and vary per block. This is similar to the 'auto' idea via intermediary columns used in longs, but can much better capitalize on encodings that do well in specialized circumstances, since an encoding doesn't have to do well for the entire column at once. 'Shape-shifting' columns were born and I finally had something that was competitive even on metrics like datasets. Sizes still come out occasionally larger in some cases, due to differences in block sizes used between the 2 strategies, but in observations so far it has been able to achieve acceptable sizes.

Column Structure

Concurrent with experimentations with FastPFOR, I was also investigating how to best wire it into the current segment structure, should it prove itself useful. Previous efforts had done work to introduce the structure to have a variety of encodings for longs, floats, and double columns, so I began there. I created EntireLayoutColumnarInts and BlockLayoutColumnarInts in the pattern that was done with longs, and ported the byte packing algorithm we currently employ for ints into the EncodingReader and EncodingWriter pattern used by ColumnarLongs. Benchmarking it however (bytepacked in the chart below), showed a performance slowdown compared to the existing implementation (vsize-byte aka VSizeColumnarInts).

entire-layout

This is the performance drop in action from the overhead of these additional layers of indirection mentioned above. It is also apparent in the long implementation:

long-auto

where at low row selection lz4-auto wins because it's decompressing a smaller volume of data, but when reading more values lz4-longs, which optimizes out the encoding reader entirely, becomes faster.

After it became apparent that FastPFOR alone was not a sufficient replacement for lz4 bytepacking, I sketched a couple of competing implementations of ShapeShiftingColumnarInts to try to both have a mechanism to abstract decoding and reading values from column implementation, but not suffer the same performance drop of the earlier approach. One was a simpler 'block' based approach that always eagerly decoded everything into an array, on the hope that because row gets are just an array access if on the same chunk, it could keep up with CompressedVSizeColumnarInts which eagerly decompresses but supports random access to individual row values unpacking bytes on the fly. The other approach was attempting to support random access for encodings that supported it, similar in spirit to the BlockLayout approach, but with one less layer of indirection. It had more overhead than the array based approach since it had a set of decoder objects and pushed reads down into them, but less than . Initial results were as expected, random access did better for a smaller number of rows selection, but was slower for a large scan than the array based approach, and both did reasonable compared to CompressedVSizeColumnarInts. In the following plots, shapeshift is the random access approach, shapeshift-block is the array based approach.

shapeshift-ok

However I eventually ran into a degradation similar in nature to what I saw with the BlockLayout/EntireLayout approach with my random access implementation, but it only happened when more than 2 encodings were used for a column.

shapeshift-sad

I refactored a handful of times to try and outsmart the jvm, but as far as I can tell from running with print inlining and assembly output enabled, I was running into a similar issue with byte-code getting too large to inline. However random access did quite a lot better for low selectivity in benchmarks, so I continued experimentation to try and get the best of both worlds, which led me to where things are now - decoders that mutate the column to either read 'directly' from memory, or to populate an int array.

I suspect even this implementation is dancing perilously close to going off the rails in the same way, so care should be definitely be taken when making any changes. However, this is true of everything in this area of the code, I find it impressively easy to just flat out wreck performance with the smallest of changes, so this is always a concern. My standard practice while working on this PR has been to fire off a full regression of benchmarks on a test machine with nearly every change I've make, and have run thousands of hours of them at this point.

ShapeShiftingColumn Encoding

The generic base types for producing a ShapeShiftingColumn are

  • abstract class ShapeShiftingColumnSerializer<TChunk, TChunkMetrics extends FormMetrics> - base column serializer which has parameters to specify the type of chunks to be encoded (e.g. int[] or long[]), and the type of metrics that are collected about that chunk used during encoding selection.
  • interface FormEncoder<TChunk, TChunkMetrics extends FormMetrics> is the interface describing... encoders, and it has generic parameters to match that of the ShapeShiftingColumnSerializer.
  • abstract class FormMetrics - the base type for the data collected about the current chunk during indexing, the ShapeShiftingColumnSerializer metric type generic parameter must extend this class.

ShapeShiftingColumnSerializer provides all typical column serialization facilities for writing to SegmentWriteOutMedium, and provides a method for 'flushing' chunks to the medium, flushCurrentChunk, which is where encoding selection is performed. Implementors must implement 2 methods, initializeChunk and resetChunkCollector which allocate the chunk storage and reset the metrics collected about a chunk respectively. It's not perfectly automatic - implementors must manually add values to the current chunk, track when to call the flush mechanism, and ensure that the FormMetrics are updated, all within the 'add' method of the column type serializer interface the column implements. For ints, it looks like this

@Override  
public void addValue(int val) throws IOException  
{  
  if (currentChunkPos == valuesPerChunk) {  
    flushCurrentChunk();  
  }  
  
  chunkMetrics.processNextRow(val);  
  
  currentChunk[currentChunkPos++] = val;  
  numValues++;  
}

I blame this shortcoming more on Java generics not working with value types than anything else, as repeated near identical structure across column types is pretty common in this area of the code.

Encoding Selection

Encoding selection is probably the most 'magical' part of this setup. The column serializer will iterate over all available FormEncoder implementations, which must be able to estimate their encoded size, as well as provide a 'scaling' factor that correlates to decoding speed relative to all other decoders and adjusted for the optimization target. The 'lowest' value is the best encoder, and is used to encode that block of values to the write out medium. This is the icky part, in that relatively thorough performance analysis must be done to see how encoding speeds relate to each other, and tune the responses of the scaling function accordingly.

lineitem-1-shapeshift-breakdown-size-summary

lineitem-1-shapeshift-breakdown-select-speed

The optimization target is to allow some subtle control over behavior:

  • ShapeShiftingOptimizationTarget.SMALLER - "make my data as small as possible"
  • ShapeShiftingOptimizationTarget.FASTBUTSMALLISH - "make my data small, but be chill if a way faster encoding is close in size" (default)
  • ShapeShiftingOptimizationTarget.FASTER - "I'm willing to trade some reasonable amount of size if it means more of my queries will be faster"

I think there is a lot of room for improvement here, and I don't know how scalable this will be. However, if the general idea is workable I would rather build improvements as an additional PR since this is already quite expansive. A very easy one would be noticing 'zero' and 'constant' blocks (if encodings exist) and short circuiting general encoding selection, since those will always be the fastest and smallest choices, if available.

Compression is provided generically, since it operates at the byte level in a value agnostic way, and is available for any encoder that implements CompressibleFormEncoder. Implementing this interface allows encoding to a temporary ByteBuffer to be compressed at indexing time. However, compression very dramatically increases encoding time because it requires actually encoding and compressing the values to determine the encoded size, so we need to be very considerate about what algorithms we attempt to compress. We at least preserve the temporary buffer, so if the last compressed encoding strategy to be used is selected it does not have to be re-encoded. It would probably be legitimate to just make all encoders compressible and get rid of the extra interface, but I haven't done that at this time.

ShapeShiftingColumn byte layout

Due to shared based serializer, all ShapeShiftingColumn implementations have the following byte layout, consisting of 2 major parts: the header, and the data.

version headerSize numValues numChunks logValuesPerChunk compositionOffset compositionSize offsetsOffset offsetsSize composition offsets values
byte int int int byte int int compositionOffset compositionSize offsetsOffset offsetsOutSize remaining
Header
  • version - byte indicating column version
  • headerSize - size of header, to future proof header changes
  • numValues - total number of rows in the column
  • numChunks - number of value chunks in the column
  • logValuesPerChunk - log base 2 of chunk size
  • compositionOffset - starting byte position of variable sized composition header section
  • compositionSize - size in bytes of the column composition section
  • offsetsOffset - starting byte position of variable sized offset header section
  • offsetsOutSize - size in bytes of the 'offsets' section
  • composition - count metrics for all chunk codecs
  • offsets - in order starting offsets into the base ByteBuffer for each chunk, with 'final' offset indicating end of final chunk.
Data
  • The data section stores the actual encoded chunks, pointed to by the offsets.

The column structure is similar in many ways to GenericIndexed, but tailored to this specific purpose (and I was afraid to touch that, since it's everywhere).

ShapeShiftingColumn Decoding

ShapeShiftingColumn and FormDecoder are the generic structure on the decoding side.

  • abstract class ShapeShiftingColumn<TShapeShiftImpl extends ShapeShiftingColumn> - Base type for reading shapeshifting columns, uses the 'curiously recurring template' pattern to be able to share this common structure and flow, and has a generic parameter is to grant type the corresponding FormDecoder implementations. To read row values for a block of data, the ShapeShiftingColumn first reads a byte header which corresponds to a FormDecoder implementation, and then calls the decoder transform method to decode values and prepare for reading.
  • interface FormDecoder<TColumn extends ShapeShiftingColumn> - Mutates ShapeShiftingColumn with a transform method to allow reading row values for that block of data. What exactly transformation entails is column implementation specific, as these types are tightly coupled, but can include setting the decoding values into a buffer and setting it as the columns read buffer, populating a primitive value array provided by the column, and the like. The two primary models of column reads I had in mind are 'direct' reads from a ByteBuffer and array reads where row values for a chunk are eagerly decoded to a primitive array, which was what I needed to support both CompressionStrategy and FastPFOR.

Decompression is provided generically here too for any ShapeShiftingColumn through CompressedFormDecoder, which will decompress the data from the buffer and transform again using the inner decoder pointed at the decompressed data buffer. While transformation is implementation specific, raw chunk loading is not since it's just offsets on the base buffer, so ShapeShiftingColumn provides a loadChunk(int chunkNumber) that will jump to the correct offset of the requested chunk and call transform to prepare for value reading.

Similar to the shortcomings on the serializer side of things, implementors of ShapeShiftingColumn will also need to manually handle chunk loading based on block size in the row index get method. For integers, it looks like this:

@Override  
public int get(final int index)  
{  
  final int desiredChunk = index >> logValuesPerChunk;  
  if (desiredChunk != currentChunk) {  
    loadChunk(desiredChunk);  
  }  
  return currentForm.decode(index & chunkIndexMask);  
}

where currentForm is a function that either reads a value from an array, or reads a value direct from a ByteBuffer, which is selected by it's implementation of transform.

Memory access

Currently, reads prefer to use Unsafe if the underlying column is stored in native order on a direct buffer for better performance due to lack of bounds checking on the 'get' method, and for non-native or non-direct reads, tracks offsets into the base ByteBuffer instead of slicing or creating any sort of other view. I know there is an initiative to move to Memory, both of these approaches can be folded into that model. During experimentation I used Memory to see how it did, and without a CompressionStrategy it performance was closer to Unsafe than ByteBuffer, but with compression the overhead of wrapping made it slightly slower than just using ByteBuffer. That said, once compression supports Memory it should be a relatively straightforward change, and will reduce the duplicate unsafe/buffer versions of everything that currently exist. Additionally, JVM9+ claims to have performance improvements for direct byte buffers and maybe other stuff worth investigating.

Shape-shifting integer columns

ShapeShiftingColumnarInts can be enabled via a new option on IndexingSpec

IndexSpec.ColumnEncodingStrategy

property type default description
strategy string (IndexSpec.EncodingStrategy) compression `(compression
optimizationTarget string (IndexSpec.ShapeShiftingOptimizationTarget) fastbutsmallish `(smaller
blockSize string (IndexSpec.ShapeShiftingBlockSize) large `(large
example
"indexSpec": {
  "dimensionCompression":"lz4",
  "intEncodingStrategy": {
    "strategy": "shapeshift",
    "optimizationTarget":"fastbutsmallish",
    "blockSize":"large"
  }
}

optimizationTarget and blockSize have a very subtle effect, as quite often the best codec for a given block is unambiguous. In essence, it provides control to sway the decisions when there are multiple options that are very close in terms of encoded size.

Optimization targets:

wiki-2-optimize-size-summary

wiki-2-optimize-select-speed

Block sizes:

wiki-2-blocks-size-summary

wiki-2-blocks-select-speed

Given how similar the outcomes are, it may make sense to not expose one or both of these configuration settings at all, but on the other hand it seems useful to be able to influence in the manner that best suits the use case, no matter how minor the difference.

Codecs

FastPFOR

The initial algorithm investigation that started this whole endeavor, FastPFOR is available via this Java FastPFOR implementation and does really well in a lot of cases.

layout
header encoded values
byte numOutputInts*Integer.BYTES

Bytepacking

Our old friend, byte packed integers, ported from the algorithm used by CompressedVSizeColumnarInts. Since values are analyzed on the fly, the number of bytes that values are packed into can vary on a chunk by chunk basis to be suited to the values. In some cases this can result in smaller column sizes than CompressedVSizeColumnarInts.

layout
header numBytes encoded values
byte byte numValues*numBytes

Run Length Bytepacking

A simple run length encoding that uses the same byte packing strategy for integers, but uses the high bit to indicate if a value is a 'run' length or a single non-repeating value. The high bit set indicates that the value is a run length, and the next value is the value that is repeated that many times. There is very likely a better run-length implementation out there, I busted this out more or less from scratch based on the other byte packing algorithm to see how well a simple run length encoding would perform when thrown into the mix. The answer is pretty decent, situationally dependent, but I imagine there is a lot of opportunity for improvement here.

layout
header numBytes encoded values
byte byte ((2*numDistinctRuns*numBytes) + (numSingleValues*numBytes))

Unencoded

Like the name says, full 4 byte ints - more for testing and reference, I have yet to see it used in the experiments conducted so far.

layout
header values
byte numValues*Integer.BYTES

Zero

This encoding is an optimization to employ if a chunk of values is all zero, allowing the chunk to be represented solely with the header byte. As you can imagine, this and constant encoding can dramatically reduce column size for really sparse columns.

zero-encoding

layout
header
byte

Constant

This encoding is an optimization to employ if a chunk of values is a non-zero constant, allowing the chunk to be represented with a header byte. Like zero, this can also have a very dramatic reduction of column size.

layout
header constant
byte int

Compression

Compression algorithms from CompressionStrategy are available to wrap 'compressible' encodings. Out of the box, integers uses lz4 with byte-packing, run length byte-packing. Unencoded is also compressible, but is not enabled by default to reduce overall indexing time. The metadata size is to account for things like the number of bytes stored in the header of a bytepacking chunk, etc.

layout
header inner codec header inner codec metadata compressed data
byte byte innerForm.getMetadataSize() remaining chunk size

Encoding

ShapeShiftingColumnarIntsSerializer is a ShapeShiftingColumnSerializer with int[] chunk type and IntFormMetrics to collect chunk metrics, which tracks minimum and maximum values, number of unique 'runs', the longest 'run', and the total number of values which are part of a 'run'.

Decoding

ShapeShiftingColumnarInts is a ShapeShiftingColumn<ShapeShiftingColumnarInts> using FormDecoder<ShapeShiftingColumnarInts> to decode values. ShapeShiftingColumnarInts decodes values in one of two basic ways depending on the encoding used, either eagerly decoding all values into an int[] provided by the column, or by reading values directly from a ByteBuffer. For buffer reads, ShapeShiftingColumnarInts cheats (as we must whenever possible), and natively knows of the byte packing algorithm used by VSizeColumnarInts and CompressedVSizeColumnarInts as well as constants. The base transform for int columns is to decode values into int[] - all decoders must currently support this, and may also implement a 'direct' transformation if the encoding supports it. I don't think it is strictly necessary that all decoders need to be able to produce to an array, admittedly this is more of an artifact of the journey that got me here and probably worth discussing.

Memory Footprint

ShapeShiftingColumnarInts does have a larger memory footprint than CompressedVSizeColumnarInts, having up to 64KB direct buffer, a 64KB on heap primitive 'decoded' value array, 65KB on heap primitive 'encoded' values array, depending on block size. The FastPFOR encoder/decoder object is an additional 1MB on heap and 200KB off heap direct buffer. With a small patch to allow setting 'page size', the FastPFOR object heap usage could be ~1/4 of it's current size, since the largest shape-shift block currently allowed is 16k values. All of the objects are pooled in CompressedPools and lazy allocated by the column, the on heap objects with a maximum cache size so they don't grow unbounded during spikes of load (this might need to be a config option). Pooling both prevents merely materializing the column from taking up unnecessary space (i.e. during merging at indexing time) and is also much faster since large on heap allocations can be rather slow. These sizes can be reduced by using a smaller blockSize on the indexing spec, which will use correctly sized direct buffers and heap arrays.

The 64KB direct buffer and 64KB on heap array are the primary memory vessels for the column, and are allocated from the pool for the lifetime of the column if they are required. They are both lazy allocated when a transformation that requires the space is encountered, so it's possible that only one or even (less commonly) none of them will be required. The 65KB 'encoded' values buffer is allocated and released for a very short time, only during a transformation for decoders that need to copy values to a int[] before decoding (e.g. FastPFOR). The FastPFOR pool is used in the same manner, very short lifetime during transformation, and then released back to the pool.

A downside of the current pooling approach I have in place is that each different block size has it's own set of pools to support it, so if multiple block sizes are used in practice, it will be a larger amount of heap and direct overhead.

Encoding Selection

Encoding selection for ShapeShiftingColumnarIntsSerializer was tuned by extensively benchmarking each encoding in a standalone manner and coming up with a selection speed ranking to properly weight the scaling factor of each IntFormEncoder implementation.

wiki-1-shapeshift-breakdown-size-summary

wiki-1-shapeshift-breakdown-select-speed

'zero' and 'constant' are the fastest, followed by 'bytepacking', with 'FastPFOR' very close behind and finally 'lz4 bytepacking'. Run length encoding performance seems very contextual, it can be quite slow if there are not a lot of runs, but can be very quick if there are many repeated values, so it's scaling value adjusts based on the data collected IntFormMetrics, and compression is not attempted at all if not enough of the overall values are 'runs'.

Benchmarks

All benchmarks were generated on a c5.large and corroborated on my macbook pro with the new benchmarks introduced in this PR. You can even try at home with your own segments in a semi manual way, replacing the segments referenced (but not supplied) by BaseColumnarIntsFromSegmentsBenchmark by setting the parameters columnName, rows, segmentPath which points to the segment file, and segmentName. Benchmarks are run in 2 phases, the first to encode the columns with each encoding specified in BaseColumnarIntsBenchmark, then to select values out of each column. To run, from the root druid source directory use these commands

$ java -server -cp benchmarks/target/benchmarks.jar io.druid.benchmark.ColumnarIntsEncodeDataFromSegmentBenchmark
...
$ java -server -cp benchmarks/target/benchmarks.jar io.druid.benchmark.ColumnarIntsSelectRowsFromSegmentBenchmark
...

and the results will be written to column-ints-encode-speed-segments.csv and column-ints-select-speed-segments.csv respectively.

I've also attached the R code I used to generate all of these plots, which you can feed the csv output into like so:

source("./path/to/plot-column-bench.R")

wikiSize = 533652 # number of rows in segment
wiki1SelectSpeed1 = read.csv("./path/to/column-ints-select-speed-segments.csv")
wiki1EncodeSpeed1 = read.csv("./path/to/column-ints-encode-speed-segments.csv")

# to generate gif images:
animatePlot(plotIntSegBench(wiki1SelectSpeed1, wiki1EncodeSpeed1, wikiSize), "wiki-1-1.gif")
animatePlot(plotIntSegSizeSummary(wiki1EncodeSpeed1, wikiSize), "wiki-1-summary-1.gif")

# or just generate plots in R studio:
plotIntSegBench(wiki1SelectSpeed1, wiki1EncodeSpeed1, wikiSize)
plotIntSegSizeSummary(wiki1EncodeSpeed1, wikiSize)

Segments

Datasource: 'Wikiticker'

The wikiticker dataset is a relatively small volume realtime datasource on our test cluster that scrapes wikipedia edits from irc to feed into kafka for ingestion. Realtime segments are quite small, so I focused on compacted segments for benchmarks to be more in line with 'recommended' segment sizes, one partially compacted, with 500,000 rows and another fully compacted to ~3.5 million rows. In my observations shapeshift reduces int column sizes by 10-15%, with a notably faster scan time.
Column sizes:

wiki-1-size-summary

Row select speed:

wiki-1-select-speed

Query metrics collected on our test cluster corroborate with these lower level benchmark results, showing respectably better query speeds. To get a baseline I repeated the same set of queries every minute against the same datasources.

wiki-shapeshift-metrics

The segments were not exactly identical, but similar enough for comparison. The int column portion of wiki segments ranges from 5-15% of total smoosh size in my observations, and overall size reduction seems to be in the 1-5% range.

Datasource: 'Twitter'

Similar to 'wikiticker', the 'twitter' dataset is a realtime stream of twitter activity using the kafka indexing service. This dataset has smaller gains with shape shifting columns than wikipedia, but it still outperforms CompressedVSizeColumnarInts much of the time, even if only slightly.
Column sizes:

twitter-1-size-summary

Row select speed:

twitter-1-select-speed

Datasource: 'TPC-H Lineitem (1GB)'

Segment created by ingesting 1GB of 'lineitem' schema adapted from the tpc-h dataset. This was a rather large segment with just over 6 million rows, and shape-shift does quite well here.
Column sizes:

lineitem-1-size-summary

Row select speed:

lineitem-1-select-speed

Integer columns overall are 15% smaller, and for many of the columns, select speeds are generally significantly faster. However, a few of the columns are marginally slower at low selectivity levels.

Datasource: 'Clarity' (metrics-like)

The 'clarity' dataset is one of our druid metrics datasources. Overall column size is actually larger here by 0.5%, but many of the columns have faster select speed.
Column sizes:

clarity-1-size-summary

Row select speed:

clarity-1-select-speed

Testing out a larger block size than is included in this PR, (64k values per block), this strategy can beat CompressedVSizeColumnarInts in overall encoded size:

clarity-mega-size-summary

but I'm on the fence about the memory footprint on blocks this large, and have not tested select speed.

What's left

There are still a handful of things left to do, but don't anticipate any major changes and would welcome feedback on this. Thanks for sticking with it if you made it this far, I know this is absurdly long for a PR description 😜

Todo:

  • Tighten up configuration and control knobs
  • Clean up 'todo' comments in code
  • More thorough unit tests
  • Test more segments from more datasources
  • Stress test a test cluster to find where stuff falls over. We've had this PR running in our test cluster for over a month mirror indexing a few datasources, but have yet to really hammer it.
  • Convince other people to test their own segments to confirm my results
  • Seriously, convince more people to test segments

Future Work

  • Extend strategy to all numerical columns
  • Smarter StupidPool that adapts object max cache size to load (or external library)
  • More experiments with encoding types, JNI based decoder experiments using native versions of eager decoded algorithms like fastpfor and rle, to see if performance can be increased and column structure simplified by avoid the need for primitive arrays
  • Encoders/decoders as extension points
  • Collect metrics about encoded column composition
  • Investigation if a sort of metrics feedback loop could help reduce encoding time by eliminating encoders which will likely perform poorly or any other enhancements to make indexing smarter and more efficient

plot-column-bench.R.zip

@clintropolis
Copy link
Member Author

I don't seem to be able to request reviews, but if I could I'd at least ask @leventov, @gianm, and @nishantmonu51 to have a look when convenient (and anyone else who is interested, the more eyes the better!)

@leventov
Copy link
Member

Some high-level thoughts:

  • It would be really nice if along or before this PR something was done to make the segment format system more structured: Structured approach to segment format evolving and versioning #5347.

  • The query processing monomorphization framework (Monomorphic processing of TopN queries with simple double aggregators over historical segments (part of #3798) #4079) should now work per-block rather than per-interval, unless all encodings in the shape-shifting framework always do something like filling an int[] array or a ByteBuffer.

  • Could you please elaborate why Memory didn't work for you well?

  • A downside of the current pooling approach I have in place is that each different block size has it's own set of pools to support it, so if multiple block sizes are used in practice, it will be a larger amount of heap and direct overhead.

    Why the same maximum-sized objects couldn't be used for blocks of any size, similarly how it is currently done with column compression buffers?

Also, in case you didn't see this article: https://lemire.me/blog/2018/04/17/iterating-in-batches-over-data-structures-can-be-much-faster/, it might give you some ideas about block sizing in general.

@himanshug
Copy link
Contributor

himanshug commented Jul 24, 2018

This is impressive.

I haven't read the code yet, but just the description. I had few doubts...

First "Time to select rows" benchmark appears to have a peak just before 3M . Assuming that peak is at x M, it says performance would be better when selecting (x+delta) rows instead of x rows. I'm interested if there is an explanation for it. That peak shows up at different points in x-axis very consistently in all similar graphs.

In "The bad" section , it seems ShapeShiftingColumn will outperform current impl in all cases, if they could use blocks of varying sizes. That sounds great and deserves ateast validating that. If true, then I think it is well worth it even with a little bit of extra heap required given that this feature already requires re-tuning heap and maybe other jvm params.

Does new design to read data increase heap reqirements, if yes then that would not be end of the world but deserves mention in the "bad" section (so re-tuning at historicals as well). Also new read design introduces mutation and I hope this doesn't mean requiring locking etc in the face of concurrent segment read or else that might cause more problems than being solved.
That said, it appears that minor changes here can whack the performance very easily (maybe even different jvm versions, hardware will have different performance). unsure whether same code produces different performance on different jvm version and hardware. This is hard problem.

@clintropolis
Copy link
Member Author

@leventov thanks for starting to think about this!

It would be really nice if along or before this PR something was done to make the segment format system more structured: #5347.

I agree, I've been thinking about that for a while now too. I've been meaning to respond to that ticket for a some time but got a bit side-tracked - I'll try to do that soon.

The query processing monomorphization framework (#4079) should now work per-block rather than per-interval, unless all encodings in the shape-shifting framework always do something like filling an int[] array or a ByteBuffer.

All currently either fill an int array or buffer (or constant), or allow reading from the base column buffer for the case of non-compressed byte packing. I do know the set of encoders used in the column up front, so I think at the least I can modify runtime shape inspection to adjust accordingly of whether or not the int arrays and decompression buffer will be in play.

Could you please elaborate why Memory didn't work for you well?

Oops, I didn't intend to imply that it didn't work well... I think it's great and I look forward to being able to replace the 'unsafe' and 'buffer' pairs of near duplicate functions with Memory versions instead. I've misplaced my benchmarks on this, but in my tests Memory was faster than using ByteBuffer (but marginally slower than Unsafe) except in the case when CompressionStrategy is used, where it was slightly slower than using the buffer. While I didn't dig in so deep, just guessing it was maybe from the overhead of having to wrap the decompression ByteBuffer in a Memory object on every chunk load? That said, it was a very small amount slower, so it might be worth it to go ahead and just start using it anyway, and maybe have a slight performance bump in the future if we can get the lz4 decoder to use it. I will also try to find the time to repeat the experiment, it was done a while ago and some things have changed since then, so maybe the performance drop I saw with compression was unrelated and falsely attributed or has since been cleared up in some other way.

Why the same maximum-sized objects couldn't be used for blocks of any size, similarly how it is currently done with column compression buffers?

They definitely could; my original line of thinking for the smaller block sizes was as much driven by my nervousness about adding heap usage on the query side as anything else, imagining that small blocks sizes would be used solely if an operator was afraid of heap impact, but I think that's not so sensible in retrospect. I think that bigger blocks always result in better compression ratio since more values can be considered at once as well as lower metadata overhead (fewer chunk offsets, headers, etc), but random access time can be slightly better with smaller blocks for encodings which do not support random access (including byte level compression). I think it probably makes sense to have them all use the larger sized pool objects, I will go ahead and make that change.

Also, in case you didn't see this article: https://lemire.me/blog/2018/04/17/iterating-in-batches-over-data-structures-can-be-much-faster/, it might give you some ideas about block sizing in general.

I saw this 👍, and was planning on trying out a batching strategy as well as some loop unrolling to try to make the eager decoded versions a bit faster.

Also related to this line of thinking, @gianm has been working on an experimental branch that we've been testing that adds vectorized versions of aggregators, allowing them to select and process batches of row values which has also been producing some very promising results.

@clintropolis
Copy link
Member Author

@himanshug Thanks for having a look!

First "Time to select rows" benchmark appears to have a peak just before 3M . Assuming that peak is at x M, it says performance would be better when selecting (x+delta) rows instead of x rows. I'm interested if there is an explanation for it. That peak shows up at different points in x-axis very consistently in all similar graphs.

My bad, I think I should've done a better job of trying to call out that the last datapoint is special on the 'time to select n rows' graphs. It represents the raw speed of a total scan, where the rest of the data points are collected using a BitSet to simulate a filter. The code in the benchmark test probably explains it best:

    IndexedInts encoder = encoders.get(encoding);
    if (filter == null) {
      for (int i = 0; i < rows; i++) {
        blackhole.consume(encoder.get(i));
      }
    } else {
      for (int i = filter.nextSetBit(0); i >= 0; i = filter.nextSetBit(i + 1)) {
        blackhole.consume(encoder.get(i));
      }
    }

Data visualization is hard 😄 Is it worth reworking the plots to not include this data point as part of the same line? Or perhaps as a separate 'total scan' line connected to another new 'time to select 1 row' data point? I also considered doing a filter of numRows - 1 to show it as the 'cliff' it is instead of producing what appears to be a peak.

In "The bad" section , it seems ShapeShiftingColumn will outperform current impl in all cases, if they could use blocks of varying sizes. That sounds great and deserves ateast validating that. If true, then I think it is well worth it even with a little bit of extra heap required given that this feature already requires re-tuning heap and maybe other jvm params.

It's less the varying that could achieve this than just having a block size that is equivalent (in terms of how many values it can hold) to the largest size CompressedVSizeColumnarIntsSerializer can produce which is 2^16 if all values can be represented with a single byte. Heap usage to support this block size in shape-shift is ~4x higher than it is with the current largest block size it offers (2^14 values), but I agree it is probably worth investigating.

Does new design to read data increase heap reqirements, if yes then that would not be end of the world but deserves mention in the "bad" section (so re-tuning at historicals as well).

Yes, on the query side heap usage is in the form of primitive arrays that are pooled in the same manner as the direct bytebuffers used for decompressing values. I will edit the summary to clarify that it introduces heap usage on both indexing and query side.

Also new read design introduces mutation and I hope this doesn't mean requiring locking etc in the face of concurrent segment read or else that might cause more problems than being solved.

I believe this should all be contained within a single thread, so locking shouldn't be necessary.

@himanshug
Copy link
Contributor

Is it worth reworking the plots to not include this data point as part of the same line?

No, explanation is good enough for me, may be just put that side note in the description but no need to redo the plots.

Regarding the block size, I should probably read the code first to understand it more.

Copy link
Contributor

@himanshug himanshug left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Read through the serialization code so far... will continue reading through rest.

}

/**
* Whenb multiple 'complete' encoders are being employed, allow encoders which 'think' they will perform poorly for a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/Whenb/When

private int previousValue;
private int numValues = 0;
private boolean isFirstValue = true;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you add docs for numRunValues, numDistinctRuns, longestRun, numValues probably take stuff from the class level java doc and put it next to specific variable. (basically the things that are exposed and not clear from their name e.g. min/max value)

I could make sense of them by reading processNextRow(..) though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Javadocs were on getters, re-arranged code so reader will have to scan over those before getting to processNextRow

*
* @throws IOException
*/
void flushCurrentChunk() throws IOException
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the argument presented in PR description about generics limitation and consequently addValue(int val) having to have access to this. Can we mark it "protected" though ?
and maybe copy the limitation blurb to class level java docs as well ?

}


if (bestCodec == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure why we are doing this check, this can't happen given that we are checking codecs.length > 0 in constructor.

bestCodec = codec;
bestSize = (int) modified;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of nested if, why are we not doing

if ( codec.getEncodedSize(currentChunk, currentChunkPos, chunkMetrics)*codec.getSpeedModifier(chunkMetrics) < bestSize) {
  ...
}

and calling int bestSize , double bestScore instead ?

Also, if codec.getSpeedModifier(..) and coded.getEncodedSize(..) have to duplicate some computation then we probably shouldn't have two methods for finding the "score" .

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that this part grew organically was showing. Originally I didn't have optimization targets and such and strictly compared sizes, and speed modifier and stuff got added later. I've changed this to use a combined modified size call so it's more straightforward.

return BASE_HEADER_BYTES + (composition.size() * 5);
}

static void writeShapeShiftHeader(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be private.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also why static ? since its static I'm having to look at caller to ensure which variable is which and doing a "find usage" on a global variable doesn't directly show its usage in writeShapeShiftHeader .

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Static by accident, updated as part of slight header refactor

protected final byte logValuesPerChunk;
protected final int valuesPerChunk;
protected final ByteBuffer intToBytesHelperBuffer;
protected final Map<FormEncoder, Integer> composition;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be better to use fastutil Object2IntMap instead

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 will look into this

"shapeshiftSmallestIntsEncodedValuesArrayPool",
SMALLEST_INT_ARRAY_SIZE + ENCODED_INTS_SHOULD_BE_ENOUGH,
INT_ENCODED_ARRAY_POOL_MAX_CACHE
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth it creating all the object pools for int[] which is always on-heap and fast to allocate when they are not long lived as appear to be the case? Did you find performance degradation without these pools?

Its a different story when they are Direct ByteBuffer whose allocation/deallocation is usually expensive.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is probably worth revisiting, I pooled these at the same time as I was trying to be more disciplined about the lifetime of objects I needed, and query metrics on our test cluster were significantly better, but it also could have been other changes.

Not pooling definitely puts more work on the garbage collector; let's consider a full scan of a column with 3M rows using the largest block size, which maps to 184 chunk and where all chunks are encoded with FastPFOR. A 64k value array is needed for the lifetime of the column to hold the currently decoded chunk. Additionally, during each chunk transition we need another 65k temporary value array to copy the encoded values of the chunk to, and a FastPFOR codec object which weighs in around 1M on heap (plus
~264k direct buffers) to decode the 65k array contents into the 64k array. It is too heavy to just allocate one of each of these up front and keep these alive for the lifetime of the column, and the column only needs them for a very short period, so everything besides the 64k value array needs to be released somehow or it makes things like merging and compaction very hard to do because of huge heap requirements. Additionally, the FastPFOR codec objects definitely need to be pooled, if only because of the direct buffers each instantiation also allocates internally.

If we just consider the value array and temp arrays, that is 185 arrays allocated and released (each requiring a contiguous chunk of memory), which is ~12.3MB. If the FastPFOR objects were also not pooled (ignoring the direct buffers, pretend they are pooled), it pushes the amount allocated and recycled up to ~200MB for scan of a single column.

Still, with a large enough heap it may not be a problem? As soon as I have some spare time I'll consider testing again with the value and temp arrays, leaving the codec object pool as is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for clarifying some of the things for me. I think it is fair to measure and do whatever performs best.

I guess that not pooling temp arrays would be ok as they are short lived (specially with the optimizations done to G1GC). It might be ok to not pool value arrays as well given that we are not operating at the limits of available heap. I haven't read through the FastPFOC code yet, but sounds like it has direct buffers which are definitely better pooled.

public void initializeChunk()
{
unencodedValuesHolder = CompressedPools.getShapeshiftIntsDecodedValuesArray(logValuesPerChunk);
currentChunk = unencodedValuesHolder.get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need just one int[] as things happen sequentially. would it not be simpler to allocate that array instead of using object pool?

* layout:
* | header (byte) | encoded values (numOutputInts * Integer.BYTES) |
*/
public final class LemireIntFormDecoder extends BaseFormDecoder<ShapeShiftingColumnarInts>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so if Lemire had another algorithm for encoding/decoding ints, what would that class be called ? :-)

may be call these classes FastPfor[Encoder/Decoder] ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are actually a variety of different algorithms in the JavaFastPFOR library and this encoder/decoder pair supports any that implement SkippableIntegerCodec.

I've only added a top level mapping for FastPFOR because it's the only one I've tested extensively, but I was planning on benchmarking some of the additional algorithms available in the library at some point to determine if any are worth adding to the mix.

That said, this was the original naming, and if the rest prove lame I can certainly make this change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't realize it was meant to support multiple algorithms. name makes sense in that case.

@clintropolis
Copy link
Member Author

@leventov @himanshug I think I've got another viable, maybe even better, variant of this general idea that I can craft with relatively minor changes - that could eliminate the need for primitive arrays entirely and move everything back to off-heap direct buffers and even help simplify the code quite a bit.

My weekend fun hack project (which has also spilled into every night this week), was to write a JNI wrapper around the native version of FastPfor, and then plug that and all of it's algorithms in as another encoder/decoder option to experiment with. This was an itch I've wanted to scratch since I started working with this stuff since I was curious how java compares to calling native code from java. I have a lot more testing and benchmarking to do, and the simd versions of codecs seem to be finicky about memory alignment, but it seems possible to achieve even better performance gains going native, based on my limited observations so far. This is using the same direct buffers from the compression pool as lz4 bytepacking, so memory footprint if we go this way should be very similar to what it is now (plus whatever the native code is allocating).

The major downside is that the FastPFOR algorithm implementations do not seem compatible with each other so it could be painful to switch later on (at least the simd version and java version, haven't tried the non simd version with the java version yet, so maybe there is still hope). I suppose it is also possible that this is a bug in one of the libraries.

There would be some consideration into how we would want to maintain this native mapping - I'm currently building all the native parts by hand and stuffing as resources in a standalone package which i can install with maven locally to test, but I'm a bit fuzzy on where to go from there and don't really know what the legit way to do this is (I was modeling the lz4 native library).

I might be getting ahead of myself, but if we were to pursue this approach, I would assume we want to maintain this as a package in druid, maybe something like druid-native-processing? I think we want a package somewhere which could hold the native java sources, JNI headers and sources, maybe git submodules of 3rd party native libraries, and pre-built versions of those libraries in the resources of the package. I think it would probably be a pain to setup cross compilation to build the native libs that are packaged in the resources in a CI way, but I think it useful at least to be able to build them from within the package manually. There are some maven plugins dealing with building native stuff that I need to look further into if we get serious about this.

I'm going to keep playing with this to see if I can get it operating smoothly. A refactor should be relatively painless and quick, I'll make a branch to sketch out what it might look like - if further testing is promising.

@himanshug
Copy link
Contributor

other challenges with [not so mature] native code is that it is very hard to debug ( specially for java programmers who aren't that familiar with tooling necessary, extra setup required and the language itself). also, bugs in native code could crash the jvm leaving little trace of what happened. getting in/out of jni is usually more expensive.

but again, experimenting is great.

Copy link
Contributor

@himanshug himanshug left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read deserialization code... haven't gone through the encoder/decoder implementations yet.

@Override
public ColumnarInts get()
{
Map<Byte, Integer> composition = columnData.getComposition();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Byte2IntArrayMap might be better here to prevent unnecessary boxing/unboxing .. this is probably a general comment for all such places.

Map<Byte, Integer> composition = columnData.getComposition();

Map<Byte, FormDecoder<ShapeShiftingColumnarInts>> decoders = IntCodecs.getDecoders(
composition.keySet().stream().collect(Collectors.toList()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we take Collection as arg in IntCodecs.getDecoders(..) so that this list creation is not needed.

Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
return (Unsafe) theUnsafe.get(null);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this code is repeated in many different places in Druid code... Can we move it to some util class and then use same in all the places ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 will try to find a home


protected final GetIntBuffer oddSizeValueGet;
protected final GetIntUnsafe oddSizeValueGetUnsafe;
ResourceHolder<int[]> decodedValuesHolder;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private ?

{
final int startOffset = columnarInts.getCurrentValuesStartOffset();
final int currentConstant = columnarInts.getCurrentValueBuffer().getInt(startOffset);
Arrays.fill(columnarInts.getDecodedValues(), currentConstant);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this appears unnecessary copying and forced because of the way abstractions are setup. (same comment for copying in ZeroIntFormDecoder).
Maybe make get(int index) in ShapeShiftingColumnarInts call get(int[] transformedValues, int i) then implementation in these decoder can just ignore the array altogether and return same value.
That said, from your notes in the PR it appears that you saw perf degradations due to inlining not happening and there is a chance that my suggestion would actually lower performance but I thought I would mention it any way.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, along with my native experiments in another branch, I'm also playing around to try and refactor decoding in this branch to eliminate unnecessary 'decode to array' implementations since I don't think they are worth keeping around, but I need to benchmark to prove it. If performance cost isn't noticeable this should allow me to have only a single transform method per decoder and simplify things a bit.

final int startOffset = columnarInts.getCurrentValuesStartOffset();
final int currentConstant = columnarInts.getCurrentValueBuffer().getInt(startOffset);
columnarInts.setCurrentBytesPerValue(0);
columnarInts.setCurrentConstant(currentConstant);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

transformUnsafe and transformBuffer impls look exactly the same ... one can call the other? or is this intentional because of method inlining not happening ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, they are exactly the same in this decoder (and zero) and nearly the same in most other implementations since they have a fair bit in common, I think it makes sense to combine these methods since it doesn't hurt to both set a current buffer offset and also if the buffer is direct, a current buffer address in the column during transform, so these can easily be consolidated.

@Override
public void inspectRuntimeShape(final RuntimeShapeInspector inspector)
{
// todo: idk
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

many todos in different places, I guess they are because you are still in progress and will be figured out before this PR is ready.

@fjy fjy modified the milestones: 0.12.3, 0.13.0 Aug 13, 2018
@clintropolis
Copy link
Member Author

Just a note if anyone is testing this with their own data, the encoding selection logic change in 3bdd6bd caused a behavior change that is resulting in larger column sizes, I'm investigating.

writer.write(maskedCounter);
writer.write(values[prev]);
runCounter = 1;
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while reading the encoder code, I wasn't sure why the additional branching for runCounter > 2 was needed, then I read the decoder code and I "think" this branching is done because decoder is faster with non-run-length-encoded values. if that sounds right, then maybe add a comment here explaining this branching.

RunLengthBytePackedIntFormEncoder.getNumBytesForMax(metrics.getMaxValue(), metrics.getLongestRun());
final int bytepackSize = numBytesBytepack * metrics.getNumValues();
// don't bother if not smaller than bytepacking
if (size > bytepackSize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

size >= bytepacksize ?

encodeValues(writer, values, numValues, numBytes);

// pad if odd length
if (numBytes == 3) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a comment on why this is necessary (if it is for alignment then shouldn't it depend on how many bytes were actually written into the buffer by encodeValues(..) call) and why it doesn't apply for numBytes == 1 ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I got it from the decoder code for numBytes = 3, however it only applies that and not odd as 1 is also odd.. a comment here would be useful.

encodeValues(writer, values, numValues, numBytes);

// pad if odd length
if (numBytes == 3) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

}
}

private int projectSize(IntFormMetrics metadata)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this not computing the exact size, project gave me the hint that it was estimated size but it is not ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, will rename

@clintropolis
Copy link
Member Author

Ok, apologies for the delay on providing an update for the native experiments. The short is the numbers look better to me - especially when retrieving a smaller number of rows, and I find the implementation cleaner. You can find the branch in comparison to this branch, here: clintropolis/druid@shapeshift...clintropolis:shapeshift-native

My first set of experiments were done with the 'native' branch in a hybrid state, where both FastPFor implementations could be tested side by side, where once I collected enough better results compelled me to go ahead and completely sketch out the implementation.

lineitem-1-select-speed-native-1

I tested the branch in this state on both my macos dev laptop and on a c5.large running linux in aws, both environments show a similar performance difference testing FastPFor vs JavaFastPFor.

After fully transforming the shapeshift-native branch, I've also started to run the complete set of benchmarks I've collected for this PR. Here are is a composite of the results of this branch and the native branch for the 'wiki-2' segment:

wiki-2-native-vs-java-select-speed-1

I like the native implementation better personally, it's faster, has no large heap arrays, and I find it more straightforward. But I only consider it halfway refactored, as the serialization side still relies on int arrays rather than direct buffers. Switching serialization to buffers may also be the path to having a strategy like CompressedVSizeColumnarIntsSerializer where we could produce larger blocks without increasing the impact on indexing, though i think it would have the effect of requiring larger buffers on the decode side (256kb instead of 64kb, to support 64k value blocks, which might be worth it in order for 'shapeshift' to win all the time on both speed and size).

The native stuff is provided through a new top level project, druid-processing-native which has both java and c++ code, as well as the c++ version of FastPFor pulled in as a submodule. I modeled the setup roughly on the lz4 java implementation we use, and in fact borrowed the resource library loader code with minor modification. Currently, I have it so the native code is built into a libDruidProcessingNative for each platform (macos and linux so far) and placed in the resources so they will be present in the jar, similar to how the lz4 native java library that we use does. This is currently done with a shell script, build.sh which initializes the git submodule, builds FastPFor(requires cmake), and builds the c++ jni sources, placing the resulting library for the platform being built (libDruidProcessingNative.so or libDruidProcessingNative.dylib) in the resources. This build is done entirely out of band from maven build, at the moment, and only really needs to be done if c++ sources directly in the project are modified, OR if we want to update the FastPFor library itself. The maven part of the build just builds the java side of stuff, packing the native objects in the resulting jar.

Next I plan to follow up and see why the java and c++ fastfor libraries do not seem compatible, as it could be nice to have the option to fall back to the java implementation. Additionally, I want to find out if it's a bug that I have to do a memory copy if the address of the chunk i want to decode isn't aligned to the nearest 16 bytes (which may make the native implementation even faster, since the chunk is most often not aligned).

I also intend to keep working my way through the benchmarks with the native implementation and comparing to java. More importantly, I'm going to test it out on an actual cluster and try to confirm it behaves as expected in the real world too.

@clintropolis
Copy link
Member Author

Apologies for the lack of activity on this branch, I just wanted to give an update that it is not in fact dead (and I'm still hyped about these changes), I just got a bit side tracked with a ton of other things and will be resuming work very soon as well as addressing segment format discussion, etc.

Additionally, I discussed the implications of Java and C++ version of FastPFOR not being compatible with @gianm, and based off of that, I think we've decided that FastPFOR is more of a strategy at this point in time than a well defined codec so it is in our interest to bring the relevant algorithm implementation(s) into Druid rather than use the library, and just own it like we have done with a handful of other things, like concise. Given this, I am going to attempt to put together a Java implementation that is compatible with the C++ implementation so we can still have the opportunity to take advantage of native code.

I will also be repeating all benchmarks to take the performance enhancements in the LZ4 update into account.

@clintropolis
Copy link
Member Author

This branch was pretty far behind, for my sanity getting it on latest I've rebased and squashed history after I think addressing all but 1 comment (centralized "unsafe"). The commit history is available here. I still need to resolve the FastPFOR situation as mentioned in my last comment, I'm looking into that next.

I've re-ran 1 set of benchmarks, and this approach is still on top even after the lz4 performance improvements from #6478
current

@clintropolis
Copy link
Member Author

I haven't forgotten about this branch, it has been in the back of my mind for a long time now (..like a really long time at this point heh). This PR is a really promising proof of concept, but it's still sort of rough around the edges and I think I can do even better. During this time, I've been thinking of some ideas on how this can better play into the vectorized query engine as well as how to always beat the segment sizes of the current approach. Of course I also still need to resolve the c++/java compatibility issue, because the c++ performance is too good to ignore (and also what LZ4 is doing so not without precedent in Druid) but we need a compatible java implementation in the event the native module is not able to load.

My main blocker is that I have just been waiting until I have a contiguous chunk of time to resolve these issues, which with any luck I anticipate will be in a couple of weeks over the holidays 🎄

@stale
Copy link

stale bot commented Dec 13, 2019

This issue is no longer marked as stale.

@stale stale bot removed the stale label Dec 13, 2019
@stale
Copy link

stale bot commented Feb 11, 2020

This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 4 weeks if no further activity occurs. If you think that's incorrect or this pull request should instead be reviewed, please simply write any comment. Even if closed, you can still revive the PR at any time or discuss it on the dev@druid.apache.org list. Thank you for your contributions.

@stale stale bot added the stale label Feb 11, 2020
@stale
Copy link

stale bot commented Apr 11, 2020

This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 4 weeks if no further activity occurs. If you think that's incorrect or this pull request should instead be reviewed, please simply write any comment. Even if closed, you can still revive the PR at any time or discuss it on the dev@druid.apache.org list. Thank you for your contributions.

@stale stale bot added the stale label Apr 11, 2020
@stale
Copy link

stale bot commented Jun 12, 2020

This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 4 weeks if no further activity occurs. If you think that's incorrect or this pull request should instead be reviewed, please simply write any comment. Even if closed, you can still revive the PR at any time or discuss it on the dev@druid.apache.org list. Thank you for your contributions.

@lgtm-com
Copy link

lgtm-com bot commented Jun 23, 2020

This pull request introduces 1 alert when merging 0ecf2be into eee99ff - view on LGTM.com

new alerts:

  • 1 for Result of multiplication cast to wider type

@stale
Copy link

stale bot commented Aug 22, 2020

This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 4 weeks if no further activity occurs. If you think that's incorrect or this pull request should instead be reviewed, please simply write any comment. Even if closed, you can still revive the PR at any time or discuss it on the dev@druid.apache.org list. Thank you for your contributions.

@stale stale bot added the stale label Aug 22, 2020
@stale
Copy link

stale bot commented Nov 7, 2020

This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 4 weeks if no further activity occurs. If you think that's incorrect or this pull request should instead be reviewed, please simply write any comment. Even if closed, you can still revive the PR at any time or discuss it on the dev@druid.apache.org list. Thank you for your contributions.

@stale stale bot added the stale label Nov 7, 2020
@clintropolis clintropolis removed the stale label Nov 7, 2020
@clintropolis clintropolis added the Evergreen Stalebot will ignore this issue label Dec 2, 2020
@stale
Copy link

stale bot commented Apr 30, 2022

This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 4 weeks if no further activity occurs. If you think that's incorrect or this pull request should instead be reviewed, please simply write any comment. Even if closed, you can still revive the PR at any time or discuss it on the dev@druid.apache.org list. Thank you for your contributions.

@github-actions
Copy link

github-actions bot commented Jul 3, 2023

This pull request has been marked as stale due to 60 days of inactivity.
It will be closed in 4 weeks if no further activity occurs. If you think
that's incorrect or this pull request should instead be reviewed, please simply
write any comment. Even if closed, you can still revive the PR at any time or
discuss it on the dev@druid.apache.org list.
Thank you for your contributions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants