Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streaming version of select query #3307

Merged
merged 12 commits into from Jan 19, 2017
Merged

Conversation

kaijianding
Copy link
Contributor

@kaijianding kaijianding commented Aug 1, 2016

It is part of this discussion https://groups.google.com/d/msg/druid-development/FK5D162ao74/13F3ixfJEQAJ

This PR is a streaming version of select.
The query is similar to select query

{
    "queryType": "scan",
    "columns": [
        "q_sales","pv"
    ],
    "dataSource": "dws_tb_log_wl_100w_shard5",
    "limit": 10,
    "filter": {
        "dimension": "q_sales",
        "value": "55",
        "type": "selector"
    },
    "intervals": [
        "2015-01-01/2016-12-07"
    ]
}

The differences between select query and scan query, are

  1. no pagingSpec, scan query reads events from the beginning to the end
  2. select query returns 1 Object who contains a event list which is huge, like {[a,b,c,d,e,f,...]}, scan query returns list of small batch of events, like [ [a,b,c], [d,e,f], [...] ]
  3. select query cost lots of memory because it has to buffer a huge list of events in memory, and flushes until the list is ready. scan query flushes when a small batch is ready, the client can get the batch while the server is preparing the next batch

It is tested by directly talking to historicals, and not fully tested from broker side.

@KurtYoung
Copy link
Contributor

Do we need granularity for the scan query?

@@ -0,0 +1,28 @@
<?xml version="1.0" encoding="UTF-8"?>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing license header

@navis
Copy link
Contributor

navis commented Aug 3, 2016

Can we do this by adding batchSize to existing select query?

@kaijianding kaijianding force-pushed the ScanQuery branch 2 times, most recently from 5acde1c to 4b20b35 Compare August 3, 2016 15:21
@kaijianding
Copy link
Contributor Author

Scan query is different from the select query in many ways, not only the batchSize thing.
Fg, the return structure, the way holding events in memory, the way handling merge results in toolchest.
Also scan query doesn't promise global ordering across segments according to query.decending to return response to client as soon as possible.

So I think it's better to create a new query type instead of reusing the select query.
@navis

@gianm
Copy link
Contributor

gianm commented Aug 3, 2016

Thanks @kaijianding for posting this!

To me, the differences (result format, lack of ordering, different processing style) are substantial enough that this should be a different query type from select.

we might also want to:

  • Remove the dimension/metric distinction, so the user can just ask for "columns". The dimension/metric split really only makes sense if we're aggregating, and Scan does not aggregate. The return format could be dictated either by the underlying column type and possibly overridden by a hint provided by the user. I'm not sure if we need to support extractionFns, but if we do, they could be provided explicitly instead of as part of DimensionSpecs. (that way, they'd even work on non-string columns!)
  • Maybe or maybe not keep "granularity", although if we do keep it, I think we shouldn't pass it to makeCursors. I think we only need to use it to truncate timestamps in the returned rows, and if so then splitting up the cursors by granularity bucket is extra overhead.
  • Would be nice to do what we need to do (if anything) to enable a future format change for the batches from JSON/Smile arrays into column-oriented binary form like Arrow or ValueVector. It'd be great if a future patch could change that for Scan through just a header or a flag on the query.

@@ -108,6 +108,7 @@
<module>extensions-contrib/parquet-extensions</module>
<module>extensions-contrib/statsd-emitter</module>
<module>extensions-contrib/orc-extensions</module>
<module>extensions-contrib/scan-query</module>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. I heard it's supposed to be sorted. orc-extentions was already not like that but we can fix altogether here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides orc-extentions, I found more exceptions is the list, I think ordering is not a requirement.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is nice if this is sorted :)

@fjy fjy added this to the 0.9.3 milestone Aug 30, 2016
@gianm
Copy link
Contributor

gianm commented Aug 30, 2016

@kaijianding, do you have any thoughts on #3307 (comment)?

@kaijianding
Copy link
Contributor Author

@gianm
For #1, I agree we can use columns instead, and if user want do extraction, he can do it in his own code after he gets the data . Even if he do want the extraction in druid, we can add the function back in another PR.
For #2, we can always use QueryGranularities.ALL, and remove granularity argument from the query.
For #3, I define ScanQuery extends BaseQuery<ScanResultValue> , the events data is wrapped inside of ScanResultValue, I can change event to type Object, then this Object can be List or ValueVector or Arrow. Another thing is that it need a http request header to tell the QueryResource to use some kind of more binary friendly protocol to transform thing like [{"segmentId":"xxx", "offset":offset, "events": "the binary here(List or ValueVector or Arrow)"}]

@fjy
Copy link
Contributor

fjy commented Aug 31, 2016

@kaijianding do you have any benchmarks around how fast data can be read using this query? Can we compare it against select? I suspect the difference will be substantial

@JsonProperty("limit") int limit,
@JsonProperty("filter") DimFilter dimFilter,
@JsonProperty("columns") List<String> columns,
@JsonProperty("context") Map<String, Object> context
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we will need documentation similar to the select query on how to use the scan query

@fjy
Copy link
Contributor

fjy commented Aug 31, 2016

I looked this over and it looks very good to me.

One main comment is that the query doesn't use the processing pool and hence will not respect query priorities and may have multi-tenancy impacts but I am still in favor of the approach taken. I think we need to note that in the docs at a minimum.

👍 after docs are added

I almost want to move this up to 0.9.2 :P

@fjy
Copy link
Contributor

fjy commented Aug 31, 2016

@kaijianding @KurtYoung is this still WIP? It looks ready

@kaijianding
Copy link
Contributor Author

Still figuring out on how to do global limit from broker side to avoid sending requests to more historicals/realtimes if data from these servers definitely won't appear in final results after final merge(usually the first server can provide enough results if query.limit is set). In another word, can broker be sending requests to servers in sequence instead of in parallel?
Besides the global limit from broker side, this PR is functional ready, but still need to add UT and documents, maybe the caching strategy part of code(do we really need the caching strategy code for scan query?)
@fjy

@fjy
Copy link
Contributor

fjy commented Sep 7, 2016

@kaijianding no, we don't need caching for scan query IMO. A large result set will just fill up the entire cache.

}

private Object rowsToValueVector() {
// only support list now, we can support ValueVector or Arrow in future
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UnsupportedOperationException is probably safer, otherwise callers will get things they don't expect

@gianm
Copy link
Contributor

gianm commented Sep 7, 2016

@kaijianding IMO if there's a global limit it's fine to hit every data node and just throw away some data past the limit. This isn't optimal performance wise, but I think that the common use case for scan is likely to be with no limit and/or with hitting data nodes directly. So, I think it's okay if we don't fully optimize for the case where there is a limit and the query hits a broker.

I think caching is unnecessary, so we can just return a null caching strategy.

Docs and tests are important though.

@fjy I'm ok with committing to this as a core feature of druid rather than a contrib extension if the authors are into that. It seems like it'd be useful for integrating Druid with other systems.

@gianm
Copy link
Contributor

gianm commented Oct 3, 2016

@kaijianding have you had a chance to work on this recently? Like mentioned in #3307 (comment), I think it's ok to pass the global limit down to each data node for the first version of the broker-side query. IMO it would be nice to get something working here in master and then improve on it.

@kaijianding kaijianding changed the title [WIP] streaming version of select query streaming version of select query Jan 16, 2017
@kaijianding
Copy link
Contributor Author

@fjy it is ready for review now

@kaijianding
Copy link
Contributor Author

@KenjiTakahashi it needs to modify QueryResource to return line delimited JSON batches, maybe in another PR.
Also I added a new resultFormat called compactedList. In this format, the result is much less bytes and should improve JSON ser/der speed, memory usage, and network transfer speed. It is useful if you only need the values of each rows(currently, the map keys are redundant.) You can have a try.

@fjy fjy assigned himanshug and fjy Jan 17, 2017
@KenjiTakahashi
Copy link
Contributor

@kaijianding Agreed that this can be considered a separate feature, for another PR.

I've tried the "compactedList" format and it is indeed noticeably (not much, but still) faster than the other one.

Thanks again for working on this, we really appreciate it!

}

// at the point where this code is called, only one datasource should exist.
String dataSource = Iterables.getOnlyElement(query.getDataSource().getNames());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Preconditions.checkArgument(..) instead, looks like dataSource isn't used and this line is there only for validation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unused code, will remove it.

allDims.retainAll(query.getColumns());
allMetrics.retainAll(query.getColumns());
}
if (query.getColumns() == null || query.getColumns().isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't this the else clause for the if before, why not else {...}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes.
Will change to else.

filter,
intervals.get(0),
VirtualColumns.EMPTY,
QueryGranularities.ALL,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure why this is truncating the timestamps completely... did you mean QueryGranularities.NONE really?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be ALL. I only want 1 cursor, and can read all data in one iterator.
If it is NONE, it will create 1 cursor per millisecond, see CursorSequenceBuilder Iterable<Long> iterable = gran.iterable(interval.getStartMillis(), interval.getEndMillis());
and BaseQueryGranularity.iterable().
AllGranularity.iterable() returns single element iterable which is what I actually need

events = rowsToList();
}
responseContext.put("count", (int) responseContext.get("count") + (offset - lastOffset));
responseContext.put("timeoutAt", timeoutAt - (System.currentTimeMillis() - start));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "count" and "timeoutAt" can be pulled in some static variable somewhere

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified.


public class ScanResultValue implements Comparable<ScanResultValue>
{
public static final String timestampKey = "timestamp";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in other places, we use __time e.g. in groupBy queries etc. It might make sense to be consistent. Also, you can probably refer to Column.TIME_COLUMN_NAME

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I follow the select query: io.druid.query.select.EventHolder.timestampKey
Do you think it is better to use __time?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dint realize that select query was using "timestamp", i guess its better to be consistent with select query so its fine

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@himanshug why you don't used "__time" instead of "timestamp"

final Iterable<QueryRunner<ScanResultValue>> queryRunners
)
{
// in single thread and in jetty thread instead of processing thread
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason for not using the queryExecutor which would scan segments in parallel and following the query cancellation behavior?

I guess, you've done it to avoid parallelization so that you can limit the number of rows across segments. In that case, you can still use queryExecutor but give it single callable that scans all segments and does check for thread interrupted before scanning each segment and abort if thread was interrupted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to limit the memory usage when scan segments, memory is the pain point of select query which buffers too many data in memory before return results to client.
So I decide to prepare the next batch when the client ask for it.

Another reason for using qtp instead of queryExecutor is that, the processing thread is valuable and usually number limited(because the direct memory is configed per processing thread). Usually the Scan query can last a very long time, it uses qtp thread anyway, so I think it is not a good idea to occupy one more processing thread for a very long time which may block other queries.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with processing thread, you would reserve a ByteBuffer but beyond that memory usage could stays as your current implementation.
ok, i see the argument about not taking up a processing thread for long time and letting them be available for other regular queries... then query cancellation does not work right now. but, it does not matter if you only do scan query directly on the historical.

@himanshug
Copy link
Contributor

@kaijianding it appears that if I send a query with a limit specified to broker... then it would give me different results on different calls based on which historicals responded first to the broker? in that case we should at least note that in the doc.
also, we have the potential to optimize and cancel queries to other historicals which haven't responded yet.

or may be remove the limit at broker and then the limit becomes "per historical" really.

@kaijianding
Copy link
Contributor Author

The CachingClusteredClient will sort the sequences by query.getResultOrdering(), for scan query, it depends on the ordering of segmentId(see ScanResultValue.compareTo()), so the broker limit should always return same result.
After the broker get enough data, it calls yielder.close() which I expect it will call JsonParserIterator.close() for each sequence and interrupt the qtp thread on historicals which actually cancel the next batch on historicals @himanshug

@himanshug himanshug merged commit 33ae9dd into apache:master Jan 19, 2017
@kaijianding kaijianding deleted the ScanQuery branch January 20, 2017 02:34
dgolitsyn pushed a commit to metamx/druid that referenced this pull request Feb 14, 2017
* streaming version of select query

* use columns instead of dimensions and metrics;prepare for valueVector;remove granularity

* respect query limit within historical

* use constant

* fix thread name corrupted bug when using jetty qtp thread rather than processing thread while working with SpecificSegmentQueryRunner

* add some test for scan query

* add scan query document

* fix merge conflicts

* add compactedList resultFormat, this format is better for json ser/der

* respect query timeout

* respect query limit on broker

* use static consts and remove unused code
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

10 participants