Skip to content

Commit

Permalink
Time Ordering On Scans (#7133)
Browse files Browse the repository at this point in the history
* Moved Scan Builder to Druids class and started on Scan Benchmark setup

* Need to form queries

* It runs.

* Stuff for time-ordered scan query

* Move ScanResultValue timestamp comparator to a separate class for testing

* Licensing stuff

* Change benchmark

* Remove todos

* Added TimestampComparator tests

* Change number of benchmark iterations

* Added time ordering to the scan benchmark

* Changed benchmark params

* More param changes

* Benchmark param change

* Made Jon's changes and removed TODOs

* Broke some long lines into two lines

* nit

* Decrease segment size for less memory usage

* Wrote tests for heapsort scan result values and fixed bug where iterator
wasn't returning elements in correct order

* Wrote more tests for scan result value sort

* Committing a param change to kick teamcity

* Fixed codestyle and forbidden API errors

* .

* Improved conciseness

* nit

* Created an error message for when someone tries to time order a result
set > threshold limit

* Set to spaces over tabs

* Fixing tests WIP

* Fixed failing calcite tests

* Kicking travis with change to benchmark param

* added all query types to scan benchmark

* Fixed benchmark queries

* Renamed sort function

* Added javadoc on ScanResultValueTimestampComparator

* Unused import

* Added more javadoc

* improved doc

* Removed unused import to satisfy PMD check

* Small changes

* Changes based on Gian's comments

* Fixed failing test due to null resultFormat

* Added config and get # of segments

* Set up time ordering strategy decision tree

* Refactor and pQueue works

* Cleanup

* Ordering is correct on n-way merge -> still need to batch events into
ScanResultValues

* WIP

* Sequence stuff is so dirty :(

* Fixed bug introduced by replacing deque with list

* Wrote docs

* Multi-historical setup works

* WIP

* Change so batching only occurs on broker for time-ordered scans

Restricted batching to broker for time-ordered queries and adjusted
tests

Formatting

Cleanup

* Fixed mistakes in merge

* Fixed failing tests

* Reset config

* Wrote tests and added Javadoc

* Nit-change on javadoc

* Checkstyle fix

* Improved test and appeased TeamCity

* Sorry, checkstyle

* Applied Jon's recommended changes

* Checkstyle fix

* Optimization

* Fixed tests

* Updated error message

* Added error message for UOE

* Renaming

* Finish rename

* Smarter limiting for pQueue method

* Optimized n-way merge strategy

* Rename segment limit -> segment partitions limit

* Added a bit of docs

* More comments

* Fix checkstyle and test

* Nit comment

* Fixed failing tests -> allow usage of all types of segment spec

* Fixed failing tests -> allow usage of all types of segment spec

* Revert "Fixed failing tests -> allow usage of all types of segment spec"

This reverts commit ec47028.

* Revert "Merge branch '6088-Time-Ordering-On-Scans-N-Way-Merge' of github.com:justinborromeo/incubator-druid into 6088-Time-Ordering-On-Scans-N-Way-Merge"

This reverts commit 57033f3, reversing
changes made to 8f01d8d.

* Check type of segment spec before using for time ordering

* Fix bug in numRowsScanned

* Fix bug messing up count of rows

* Fix docs and flipped boolean in ScanQueryLimitRowIterator

* Refactor n-way merge

* Added test for n-way merge

* Refixed regression

* Checkstyle and doc update

* Modified sequence limit to accept longs and added test for long limits

* doc fix

* Implemented Clint's recommendations
  • Loading branch information
justinborromeo authored and jon-wei committed Mar 28, 2019
1 parent ffa9585 commit ad7862c
Show file tree
Hide file tree
Showing 31 changed files with 1,861 additions and 196 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,15 @@ public class ScanBenchmark
@Param({"200000"})
private int rowsPerSegment;

@Param({"basic.A"})
@Param({"basic.A", "basic.B", "basic.C", "basic.D"})
private String schemaAndQuery;

@Param({"1000", "99999"})
private int limit;

@Param({"NONE", "DESCENDING", "ASCENDING"})
private static ScanQuery.Order ordering;

private static final Logger log = new Logger(ScanBenchmark.class);
private static final ObjectMapper JSON_MAPPER;
private static final IndexMergerV9 INDEX_MERGER_V9;
Expand Down Expand Up @@ -178,7 +181,8 @@ private static Druids.ScanQueryBuilder basicA(final BenchmarkSchemaInfo basicSch

return Druids.newScanQueryBuilder()
.dataSource("blah")
.intervals(intervalSpec);
.intervals(intervalSpec)
.order(ordering);
}

private static Druids.ScanQueryBuilder basicB(final BenchmarkSchemaInfo basicSchema)
Expand All @@ -197,7 +201,9 @@ private static Druids.ScanQueryBuilder basicB(final BenchmarkSchemaInfo basicSch

return Druids.newScanQueryBuilder()
.filters(filter)
.intervals(intervalSpec);
.dataSource("blah")
.intervals(intervalSpec)
.order(ordering);
}

private static Druids.ScanQueryBuilder basicC(final BenchmarkSchemaInfo basicSchema)
Expand All @@ -207,8 +213,10 @@ private static Druids.ScanQueryBuilder basicC(final BenchmarkSchemaInfo basicSch

final String dimName = "dimUniform";
return Druids.newScanQueryBuilder()
.filters(new SelectorDimFilter(dimName, "3", StrlenExtractionFn.instance()))
.intervals(intervalSpec);
.filters(new SelectorDimFilter(dimName, "3", StrlenExtractionFn.instance()))
.intervals(intervalSpec)
.dataSource("blah")
.order(ordering);
}

private static Druids.ScanQueryBuilder basicD(final BenchmarkSchemaInfo basicSchema)
Expand All @@ -220,8 +228,10 @@ private static Druids.ScanQueryBuilder basicD(final BenchmarkSchemaInfo basicSch
final String dimName = "dimUniform";

return Druids.newScanQueryBuilder()
.filters(new BoundDimFilter(dimName, "100", "10000", true, true, true, null, null))
.intervals(intervalSpec);
.filters(new BoundDimFilter(dimName, "100", "10000", true, true, true, null, null))
.intervals(intervalSpec)
.dataSource("blah")
.order(ordering);
}

@Setup
Expand Down Expand Up @@ -289,7 +299,8 @@ public void setup() throws IOException
config,
DefaultGenericQueryMetricsFactory.instance()
),
new ScanQueryEngine()
new ScanQueryEngine(),
new ScanQueryConfig()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@
final class LimitedSequence<T> extends YieldingSequenceBase<T>
{
private final Sequence<T> baseSequence;
private final int limit;
private final long limit;

LimitedSequence(
Sequence<T> baseSequence,
int limit
long limit
)
{
Preconditions.checkNotNull(baseSequence);
Expand Down Expand Up @@ -106,7 +106,7 @@ public void close() throws IOException

private class LimitedYieldingAccumulator<OutType, T> extends DelegatingYieldingAccumulator<OutType, T>
{
int count;
long count;
boolean interruptYield = false;

LimitedYieldingAccumulator(YieldingAccumulator<OutType, T> accumulator)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.PriorityQueue;

/**
* Used to perform an n-way merge on n ordered sequences
*/
public class MergeSequence<T> extends YieldingSequenceBase<T>
{
Expand All @@ -42,20 +43,18 @@ public MergeSequence(
this.baseSequences = (Sequence<? extends Sequence<T>>) baseSequences;
}

/*
Note: the yielder for MergeSequence returns elements from the priority queue in order of increasing priority.
This is due to the fact that PriorityQueue#remove() polls from the head of the queue which is, according to
the PriorityQueue javadoc, "the least element with respect to the specified ordering"
*/
@Override
public <OutType> Yielder<OutType> toYielder(OutType initValue, YieldingAccumulator<OutType, T> accumulator)
{
PriorityQueue<Yielder<T>> pQueue = new PriorityQueue<>(
32,
ordering.onResultOf(
new Function<Yielder<T>, T>()
{
@Override
public T apply(Yielder<T> input)
{
return input.get();
}
}
(Function<Yielder<T>, T>) input -> input.get()
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ public interface Sequence<T>
<OutType> OutType accumulate(OutType initValue, Accumulator<OutType, T> accumulator);

/**
* Return an Yielder for accumulated sequence.
* Return a Yielder for accumulated sequence.
*
* @param initValue the initial value to pass along to start the accumulation.
* @param accumulator the accumulator which is responsible for accumulating input values.
* @param <OutType> the type of accumulated value.
*
* @return an Yielder for accumulated sequence.
* @return a Yielder for accumulated sequence.
*
* @see Yielder
*/
Expand All @@ -72,6 +72,8 @@ default <U> Sequence<U> map(Function<? super T, ? extends U> mapper)
}

/**
* This will materialize the entire sequence. Use at your own risk.
*
* Several benchmarks rely on this method to eagerly accumulate Sequences to ArrayLists. e.g.
* GroupByBenchmark.
*/
Expand All @@ -80,7 +82,7 @@ default List<T> toList()
return accumulate(new ArrayList<>(), Accumulators.list());
}

default Sequence<T> limit(int limit)
default Sequence<T> limit(long limit)
{
return new LimitedSequence<>(this, limit);
}
Expand Down
97 changes: 53 additions & 44 deletions docs/content/querying/scan-query.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,16 @@ title: "Scan query"

# Scan query

Scan query returns raw Druid rows in streaming mode.
The Scan query returns raw Druid rows in streaming mode. The biggest difference between the Select query and the Scan
query is that the Scan query does not retain all the returned rows in memory before they are returned to the client.
The Select query _will_ retain the rows in memory, causing memory pressure if too many rows are returned.
The Scan query can return all the rows without issuing another pagination query.

In addition to straightforward usage where a Scan query is issued to the Broker, the Scan query can also be issued
directly to Historical processes or streaming ingestion tasks. This can be useful if you want to retrieve large
amounts of data in parallel.

An example Scan query object is shown below:

```json
{
Expand All @@ -36,28 +45,29 @@ Scan query returns raw Druid rows in streaming mode.
"2013-01-01/2013-01-02"
],
"batchSize":20480,
"limit":5
"limit":3
}
```

There are several main parts to a scan query:
The following are the main parameters for Scan queries:

|property|description|required?|
|--------|-----------|---------|
|queryType|This String should always be "scan"; this is the first thing Druid looks at to figure out how to interpret the query|yes|
|dataSource|A String or Object defining the data source to query, very similar to a table in a relational database. See [DataSource](../querying/datasource.html) for more information.|yes|
|intervals|A JSON Object representing ISO-8601 Intervals. This defines the time ranges to run the query over.|yes|
|resultFormat|How result represented, list or compactedList or valueVector. Currently only `list` and `compactedList` are supported. Default is `list`|no|
|resultFormat|How the results are represented: list, compactedList or valueVector. Currently only `list` and `compactedList` are supported. Default is `list`|no|
|filter|See [Filters](../querying/filters.html)|no|
|columns|A String array of dimensions and metrics to scan. If left empty, all dimensions and metrics are returned.|no|
|batchSize|How many rows buffered before return to client. Default is `20480`|no|
|limit|How many rows to return. If not specified, all rows will be returned.|no|
|order|The ordering of returned rows based on timestamp. "ascending", "descending", and "none" (default) are supported. Currently, "ascending" and "descending" are only supported for queries where the limit is less than `druid.query.scan.maxRowsQueuedForOrdering`. Scan queries that are either legacy mode or have a limit greater than `druid.query.scan.maxRowsQueuedForOrdering` will not be time-ordered and default to a order of "none".|none|
|legacy|Return results consistent with the legacy "scan-query" contrib extension. Defaults to the value set by `druid.query.scan.legacy`, which in turn defaults to false. See [Legacy mode](#legacy-mode) for details.|no|
|context|An additional JSON Object which can be used to specify certain flags.|no|

## Example results

The format of the result when resultFormat equals to `list`:
The format of the result when resultFormat equals `list`:

```json
[{
Expand Down Expand Up @@ -123,41 +133,11 @@ The format of the result when resultFormat equals to `list`:
"delta" : 77.0,
"variation" : 77.0,
"deleted" : 0.0
}, {
"timestamp" : "2013-01-01T00:00:00.000Z",
"robot" : "0",
"namespace" : "article",
"anonymous" : "0",
"unpatrolled" : "0",
"page" : "113_U.S._73",
"language" : "en",
"newpage" : "1",
"user" : "MZMcBride",
"count" : 1.0,
"added" : 70.0,
"delta" : 70.0,
"variation" : 70.0,
"deleted" : 0.0
}, {
"timestamp" : "2013-01-01T00:00:00.000Z",
"robot" : "0",
"namespace" : "article",
"anonymous" : "0",
"unpatrolled" : "0",
"page" : "113_U.S._756",
"language" : "en",
"newpage" : "1",
"user" : "MZMcBride",
"count" : 1.0,
"added" : 68.0,
"delta" : 68.0,
"variation" : 68.0,
"deleted" : 0.0
} ]
} ]
```

The format of the result when resultFormat equals to `compactedList`:
The format of the result when resultFormat equals `compactedList`:

```json
[{
Expand All @@ -168,18 +148,39 @@ The format of the result when resultFormat equals to `compactedList`:
"events" : [
["2013-01-01T00:00:00.000Z", "1", "article", "0", "0", "11._korpus_(NOVJ)", "sl", "0", "EmausBot", 1.0, 39.0, 39.0, 39.0, 0.0],
["2013-01-01T00:00:00.000Z", "0", "article", "0", "0", "112_U.S._580", "en", "1", "MZMcBride", 1.0, 70.0, 70.0, 70.0, 0.0],
["2013-01-01T00:00:00.000Z", "0", "article", "0", "0", "113_U.S._243", "en", "1", "MZMcBride", 1.0, 77.0, 77.0, 77.0, 0.0],
["2013-01-01T00:00:00.000Z", "0", "article", "0", "0", "113_U.S._73", "en", "1", "MZMcBride", 1.0, 70.0, 70.0, 70.0, 0.0],
["2013-01-01T00:00:00.000Z", "0", "article", "0", "0", "113_U.S._756", "en", "1", "MZMcBride", 1.0, 68.0, 68.0, 68.0, 0.0]
["2013-01-01T00:00:00.000Z", "0", "article", "0", "0", "113_U.S._243", "en", "1", "MZMcBride", 1.0, 77.0, 77.0, 77.0, 0.0]
]
} ]
```

The biggest difference between select query and scan query is that, scan query doesn't retain all rows in memory before rows can be returned to client.
It will cause memory pressure if too many rows required by select query.
Scan query doesn't have this issue.
Scan query can return all rows without issuing another pagination query, which is extremely useful when query against Historical or realtime process directly.

## Time Ordering

The Scan query currently supports ordering based on timestamp for non-legacy queries. Note that using time ordering
will yield results that do not indicate which segment rows are from (`segmentId` will show up as `null`). Furthermore,
time ordering is only supported where the result set limit is less than `druid.query.scan.maxRowsQueuedForOrdering`
rows **or** all segments scanned have fewer than `druid.query.scan.maxSegmentPartitionsOrderedInMemory` partitions. Also,
time ordering is not supported for queries issued directly to historicals unless a list of segments is specified. The
reasoning behind these limitations is that the implementation of time ordering uses two strategies that can consume too
much heap memory if left unbounded. These strategies (listed below) are chosen on a per-Historical basis depending on
query result set limit and the number of segments being scanned.

1. Priority Queue: Each segment on a Historical is opened sequentially. Every row is added to a bounded priority
queue which is ordered by timestamp. For every row above the result set limit, the row with the earliest (if descending)
or latest (if ascending) timestamp will be dequeued. After every row has been processed, the sorted contents of the
priority queue are streamed back to the Broker(s) in batches. Attempting to load too many rows into memory runs the
risk of Historical nodes running out of memory. The `druid.query.scan.maxRowsQueuedForOrdering` property protects
from this by limiting the number of rows in the query result set when time ordering is used.

2. N-Way Merge: For each segment, each partition is opened in parallel. Since each partition's rows are already
time-ordered, an n-way merge can be performed on the results from each partition. This approach doesn't persist the entire
result set in memory (like the Priority Queue) as it streams back batches as they are returned from the merge function.
However, attempting to query too many partition could also result in high memory usage due to the need to open
decompression and decoding buffers for each. The `druid.query.scan.maxSegmentPartitionsOrderedInMemory` limit protects
from this by capping the number of partitions opened at any times when time ordering is used.

Both `druid.query.scan.maxRowsQueuedForOrdering` and `druid.query.scan.maxSegmentPartitionsOrderedInMemory` are
configurable and can be tuned based on hardware specs and number of dimensions being queried.

## Legacy mode

The Scan query supports a legacy mode designed for protocol compatibility with the former scan-query contrib extension.
Expand All @@ -194,3 +195,11 @@ Legacy mode can be triggered either by passing `"legacy" : true` in your query J
`druid.query.scan.legacy = true` on your Druid processes. If you were previously using the scan-query contrib extension,
the best way to migrate is to activate legacy mode during a rolling upgrade, then switch it off after the upgrade
is complete.

## Configuration Properties

|property|description|values|default|
|--------|-----------|------|-------|
|druid.query.scan.maxRowsQueuedForOrdering|The maximum number of rows returned when time ordering is used|An integer in [0, 2147483647]|100000|
|druid.query.scan.maxSegmentPartitionsOrderedInMemory|The maximum number of segments scanned per historical when time ordering is used|An integer in [0, 2147483647]|50|
|druid.query.scan.legacy|Whether legacy mode should be turned on for Scan queries|true or false|false|
Original file line number Diff line number Diff line change
Expand Up @@ -2586,7 +2586,8 @@ public void registerQuery(Query query, ListenableFuture future)
new ScanQueryConfig(),
new DefaultGenericQueryMetricsFactory(TestHelper.makeJsonMapper())
),
new ScanQueryEngine()
new ScanQueryEngine(),
new ScanQueryConfig()
)
)
.build()
Expand Down
16 changes: 13 additions & 3 deletions processing/src/main/java/org/apache/druid/query/Druids.java
Original file line number Diff line number Diff line change
Expand Up @@ -918,12 +918,13 @@ public static class ScanQueryBuilder
private QuerySegmentSpec querySegmentSpec;
private VirtualColumns virtualColumns;
private Map<String, Object> context;
private String resultFormat;
private ScanQuery.ResultFormat resultFormat;
private int batchSize;
private long limit;
private DimFilter dimFilter;
private List<String> columns;
private Boolean legacy;
private ScanQuery.Order order;

public ScanQueryBuilder()
{
Expand All @@ -937,6 +938,7 @@ public ScanQueryBuilder()
dimFilter = null;
columns = new ArrayList<>();
legacy = null;
order = null;
}

public ScanQuery build()
Expand All @@ -948,6 +950,7 @@ public ScanQuery build()
resultFormat,
batchSize,
limit,
order,
dimFilter,
columns,
legacy,
Expand All @@ -967,7 +970,8 @@ public static ScanQueryBuilder copy(ScanQuery query)
.filters(query.getFilter())
.columns(query.getColumns())
.legacy(query.isLegacy())
.context(query.getContext());
.context(query.getContext())
.order(query.getOrder());
}

public ScanQueryBuilder dataSource(String ds)
Expand Down Expand Up @@ -1005,7 +1009,7 @@ public ScanQueryBuilder context(Map<String, Object> c)
return this;
}

public ScanQueryBuilder resultFormat(String r)
public ScanQueryBuilder resultFormat(ScanQuery.ResultFormat r)
{
resultFormat = r;
return this;
Expand Down Expand Up @@ -1046,6 +1050,12 @@ public ScanQueryBuilder legacy(Boolean legacy)
this.legacy = legacy;
return this;
}

public ScanQueryBuilder order(ScanQuery.Order order)
{
this.order = order;
return this;
}
}

public static ScanQueryBuilder newScanQueryBuilder()
Expand Down
Loading

0 comments on commit ad7862c

Please sign in to comment.