Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Proposal] Vectorized Aggregations #3011

Closed
xvrl opened this issue May 25, 2016 · 7 comments
Closed

[Proposal] Vectorized Aggregations #3011

xvrl opened this issue May 25, 2016 · 7 comments

Comments

@xvrl
Copy link
Member

xvrl commented May 25, 2016

It seems like vectorization is all the rage these days across the big data world, and a few people have pinged me about the work I had started at some point, but never quite finished.

The gist of the initial work can be found in the following branch (this is just a "no-filter double-sum timeseries", and "multi-value top-n" proof of concept)
https://github.com/druid-io/druid/compare/master...metamx:vectorize-take2?expand=1

The basic idea is to add two new methods on the aggregators:

  • a block aggregation method that can aggregate an entire column chunk at once
  • a parallel aggregation method that can aggregate multiple buffers at once.

The parallel aggregation can be used to do do multiple dimension values at once in top-n for example.

The block aggregation can massively speed up full column aggregations, thanks to the JVM being able to emit data parallel instructions (e.g. AVX) when you loop unroll the aggregations.

For block aggregation however, it is not enough to just loop-unroll. To achieve significant improvements, we also want to do block reads from the column and avoid materializing the filter offsets if possible.

To that effect we also need to add:

  • Methods to block reads to the columns to return arrays of float / long
  • Methods to block increment the offsets.

That's where things stand in the current proof of concept, which according to the benchmarks in #2875 give us a 2.6x speedup for a single column double-sum timeseries without filter, and a 1.6x speedup for a double-sum top-n on a multi-value column with 6 values per row.

# Run complete. Total time: 00:02:13

Benchmark                                      (numSegments)  (rowsPerSegment)  (schemaAndQuery)  (vectorize)  Mode  Cnt      Score     Error  Units
TimeseriesBenchmark.querySingleQueryableIndex              1            750000           basic.A         true  avgt   25   9557.090 ± 182.812  us/op
TimeseriesBenchmark.querySingleQueryableIndex              1            750000           basic.A        false  avgt   25  25360.562 ± 597.906  us/op

# Run complete. Total time: 00:02:15

Benchmark                                (numSegments)  (rowsPerSegment)  (schemaAndQuery)  (threshold)  (vectorize)  Mode  Cnt       Score      Error  Units
TopNBenchmark.querySingleQueryableIndex              1            750000           basic.A           10         true  avgt   25   95951.322 ± 1541.184  us/op
TopNBenchmark.querySingleQueryableIndex              1            750000           basic.A           10        false  avgt   25  157396.914 ± 2099.387  us/op

So far those changes only handle the non-filtered aggregation. In order to support efficient aggregation for filtered queries, we also need to manage filtered offsets in a block-like manner.

I talked a little bit more about it with @gianm, and based on our conversation we've come up with the following ideas to tackle this:

We can change ReadableOffsets to be able to return an array of offsets that match for the current block, e.g.

public interface ReadableOffset
{
  /**
   * Returns either an array of offsets for the current row block (if non-contiguous) or an array in which the first
   * element is negative and indicates -first_offset for a contiguous block.
   */
  int[] getOffsets();
}

The offset array can then be pre-allocated by the engine, and then used by the column selector to return a coalesced block of the values that should be aggregated.

Note: these blocks may need to be cached to avoid thrashing of compression buffers when we have multiple selectors for each column.

To simplify things further, we'll probably want to get rid of on-heap aggregators, so we don't have to implentn four different implementations for each aggregator and only have vectorized and non-vectorized at the query engine level (we can keep the on-heap non-vectorized ones for merging)

We'd love to hear people's thoughts, and @gianm please feel free to add anything noteworthy I may have left out from our conversion. Thanks!

@xvrl xvrl added the Discuss label May 25, 2016
@gianm
Copy link
Contributor

gianm commented May 25, 2016

I can back this statement up:

For block aggregation however, it is not enough to just loop-unroll. To achieve significant improvements, we also want to do block reads from the column and avoid materializing the filter offsets if possible.

I did test out what performance would be like if we only unrolled aggregator loops (but didn't do anything smart on the column-reading side, just looped over the existing Offset and column-reading interfaces). The speedup on a single-longSum-agg timeseries query was about 20%, which is good but not as good as the 60% that @xvrl got by also doing block reads and avoiding offset materialization.

Since @xvrl's implementation didn't support filters though, it remains to be seen where in between 20% and 60% we would end up with something that does support filters but is also a bit smarter than just looping over the existing interfaces like I did.

@sascha-coenen
Copy link

Does this PR relate to Apache Arrow in some way? I believe I read somewhere that you guys had this framework under consideration at some point.

@fjy
Copy link
Contributor

fjy commented May 29, 2016

@sascha-coenen No.

We've never really considered Arrow, however the issue that once completed will eventually lead to Druid supporting extendable segment formats including Arrow is here: #2965

@sascha-coenen
Copy link

I don't mean to spam your discussion with my input as I'm not too knowledgeable about the Druid internals, but if I may, I'd like to propose something while not knowing whether things already work that way or not nor do I know whether the proposal is sound or not.

To my understanding, segments do currently not contain any aggregates that have a higher granularity than that of the rolled up records.
If a segment's block for a given metric had an aggregate value for the whole block, then one could reduce the computational cost for queries:
If the bitmap of records to aggregate over has less than half of the bits set that make up one block, one would sum these values up as usual. If the bitmap had more than half of the bits set that make up one block, one would aggregate over the inverted bitmap and subtract the result from the pre-aggregated value that is stored for that block.
This would ensure that in the worst case, only half of a block's records would have to be aggregated.
Seeing how the query latency is mostly determined by how many records remain to be aggregated after filtering, it seems that the opportunity of cutting these in half is worth holding the pre-aggregated values.
Where to keep those aggregates is of course another topic. They could go into the segments or be kept in some cache....

@xvrl
Copy link
Member Author

xvrl commented Jun 7, 2016

@sascha-coenen it sounds like what you are proposing is some form of materialized views. The question then would be what time granularity to choose for those pre-aggregated records, since that may differ from the underlying segment granularity. Your example case would work for simple sum aggregations, but it would not necessarily extend to complex aggregations or even min / max aggregations, since those do not necessarily support subtracting rows. It sounds like this could help in a small number of use cases, but I'm not sure if it's worth the added complexity. Given that this is unrelated to the topic of this issue, I suggest opening a separate issue if you'd like to discuss further.

gianm added a commit to gianm/druid that referenced this issue Jan 2, 2019
This patch includes vectorized timeseries and groupBy engines, as well
as some analogs of your favorite Druid classes:

- VectorCursor is like Cursor. (It comes from StorageAdapter.makeVectorCursor.)
- VectorColumnSelectorFactory is like ColumnSelectorFactory, and it has
  methods to create analogs of the column selectors you know and love.
- VectorOffset and ReadableVectorOffset are like Offset and ReadableOffset.
- VectorAggregator is like BufferAggregator.
- VectorValueMatcher is like ValueMatcher.

There are some noticeable differences between vectorized and regular
execution:

- Unlike regular cursors, vector cursors do not understand time
  granularity. They expect query engines to handle this on their own,
  which a new VectorCursorGranularizer class helps with. This is to
  avoid too much batch-splitting and to respect the fact that vector
  selectors are somewhat more heavyweight than regular selectors.
- Unlike FilteredOffset, FilteredVectorOffset does not leverage indexes
  for filters that might partially support them (like an OR of one
  filter that supports indexing and another that doesn't). I'm not sure
  that this behavior is desirable anyway (it is potentially too eager)
  but, at any rate, it'd be better to harmonize it between the two
  classes. Potentially they should both do some different thing that
  is smarter than what either of them is doing right now.
- When vector cursors are created by QueryableIndexCursorSequenceBuilder,
  they use a morphing binary-then-linear search to find their start and
  end rows, rather than linear search.

Limitations in this patch are:

- Only timeseries and groupBy have vectorized engines.
- GroupBy doesn't handle multi-value dimensions yet.
- Vector cursors cannot handle virtual columns or descending order.
- Only some filters have vectorized matchers: "selector", "bound", "in",
  "like", "regex", "search", "and", "or", and "not".
- Only some aggregators have vectorized implementations: "count",
  "doubleSum", "floatSum", "longSum", "hyperUnique", and "filtered".
- Dimension specs other than "default" don't work yet (no extraction
  functions or filtered dimension specs).

Currently, the testing strategy includes adding vectorization-enabled
tests to TimeseriesQueryRunnerTest, GroupByQueryRunnerTest,
GroupByTimeseriesQueryRunnerTest, CalciteQueryTest, and all of the
filtering tests that extend BaseFilterTest. In all of those classes,
there are some test cases that don't support vectorization. They are
marked by special function calls like "cannotVectorize" or "skipVectorize"
that tell the test harness to either expect an exception or to skip the
test case.

Testing should be expanded in the future -- a project in and of itself.

Related to apache#3011.
@stale
Copy link

stale bot commented Jun 21, 2019

This issue has been marked as stale due to 280 days of inactivity. It will be closed in 2 weeks if no further activity occurs. If this issue is still relevant, please simply write any comment. Even if closed, you can still revive the issue at any time or discuss it on the dev@druid.apache.org list. Thank you for your contributions.

@stale stale bot added the stale label Jun 21, 2019
@stale
Copy link

stale bot commented Jul 5, 2019

This issue has been closed due to lack of activity. If you think that is incorrect, or the issue requires additional review, you can revive the issue at any time.

@stale stale bot closed this as completed Jul 5, 2019
gianm added a commit that referenced this issue Jul 12, 2019
* Benchmarks: New SqlBenchmark, add caching & vectorization to some others.

- Introduce a new SqlBenchmark geared towards benchmarking a wide
  variety of SQL queries. Rename the old SqlBenchmark to
  SqlVsNativeBenchmark.
- Add (optional) caching to SegmentGenerator to enable easier
  benchmarking of larger segments.
- Add vectorization to FilteredAggregatorBenchmark and GroupByBenchmark.

* Query vectorization.

This patch includes vectorized timeseries and groupBy engines, as well
as some analogs of your favorite Druid classes:

- VectorCursor is like Cursor. (It comes from StorageAdapter.makeVectorCursor.)
- VectorColumnSelectorFactory is like ColumnSelectorFactory, and it has
  methods to create analogs of the column selectors you know and love.
- VectorOffset and ReadableVectorOffset are like Offset and ReadableOffset.
- VectorAggregator is like BufferAggregator.
- VectorValueMatcher is like ValueMatcher.

There are some noticeable differences between vectorized and regular
execution:

- Unlike regular cursors, vector cursors do not understand time
  granularity. They expect query engines to handle this on their own,
  which a new VectorCursorGranularizer class helps with. This is to
  avoid too much batch-splitting and to respect the fact that vector
  selectors are somewhat more heavyweight than regular selectors.
- Unlike FilteredOffset, FilteredVectorOffset does not leverage indexes
  for filters that might partially support them (like an OR of one
  filter that supports indexing and another that doesn't). I'm not sure
  that this behavior is desirable anyway (it is potentially too eager)
  but, at any rate, it'd be better to harmonize it between the two
  classes. Potentially they should both do some different thing that
  is smarter than what either of them is doing right now.
- When vector cursors are created by QueryableIndexCursorSequenceBuilder,
  they use a morphing binary-then-linear search to find their start and
  end rows, rather than linear search.

Limitations in this patch are:

- Only timeseries and groupBy have vectorized engines.
- GroupBy doesn't handle multi-value dimensions yet.
- Vector cursors cannot handle virtual columns or descending order.
- Only some filters have vectorized matchers: "selector", "bound", "in",
  "like", "regex", "search", "and", "or", and "not".
- Only some aggregators have vectorized implementations: "count",
  "doubleSum", "floatSum", "longSum", "hyperUnique", and "filtered".
- Dimension specs other than "default" don't work yet (no extraction
  functions or filtered dimension specs).

Currently, the testing strategy includes adding vectorization-enabled
tests to TimeseriesQueryRunnerTest, GroupByQueryRunnerTest,
GroupByTimeseriesQueryRunnerTest, CalciteQueryTest, and all of the
filtering tests that extend BaseFilterTest. In all of those classes,
there are some test cases that don't support vectorization. They are
marked by special function calls like "cannotVectorize" or "skipVectorize"
that tell the test harness to either expect an exception or to skip the
test case.

Testing should be expanded in the future -- a project in and of itself.

Related to #3011.

* WIP

* Adjustments for unused things.

* Adjust javadocs.

* DimensionDictionarySelector adjustments.

* Add "clone" to BatchIteratorAdapter.

* ValueMatcher javadocs.

* Fix benchmark.

* Fixups post-merge.

* Expect exception on testGroupByWithStringVirtualColumn for IncrementalIndex.

* BloomDimFilterSqlTest: Tag two non-vectorizable tests.

* Minor adjustments.

* Update surefire, bump up Xmx in Travis.

* Some more adjustments.

* Javadoc adjustments

* AggregatorAdapters adjustments.

* Additional comments.

* Remove switching search.

* Only missiles.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants