Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions docs/content/Broker-Config.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ Druid uses Jetty to serve HTTP requests.

#### Processing

The broker only uses processing configs for nested groupBy queries.
The broker uses processing configs for nested groupBy queries. And, optionally, Long-interval queries (of any type) can be broken into shorter interval queries and processed in parallel inside this thread pool. For more details, see "chunkPeriod" in [Querying](Querying.html) doc.


|Property|Description|Default|
|--------|-----------|-------|
Expand All @@ -52,10 +53,6 @@ The broker only uses processing configs for nested groupBy queries.

#### General Query Configuration

|Property|Description|Default|
|--------|-----------|-------|
|`druid.query.chunkPeriod`|Long-interval queries (of any type) may be broken into shorter interval queries, reducing the impact on resources. Use ISO 8601 periods. For example, if this property is set to `P1M` (one month), then a query covering a year would be broken into 12 smaller queries. |0 (off)|

##### GroupBy Query Config

|Property|Description|Default|
Expand Down
2 changes: 2 additions & 0 deletions docs/content/Querying.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ Properties shared by all query types
|populateCache | `true` | Flag indicating whether to save the results of the query to the query cache. Primarily used for debugging. This may be overriden in the broker or historical node configuration |
|bySegment | `false` | Return "by segment" results. Pimarily used for debugging, setting it to `true` returns results associated with the data segment they came from |
|finalize | `true` | Flag indicating whether to "finalize" aggregation results. Primarily used for debugging. For instance, the `hyperUnique` aggregator will return the full HyperLogLog sketch instead of the estimated cardinality when this flag is set to `false` |
|chunkPeriod | `0` (off) | At broker, Long-interval queries (of any type) may be broken into shorter interval queries, reducing the impact on resources. Use ISO 8601 periods. For example, if this property is set to `P1M` (one month), then a query covering a year would be broken into 12 smaller queries. All the query chunks will be processed asynchronously inside query processing executor service. Make sure "druid.processing.numThreads" is configured appropriately. |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you double check the broker docs for druid.processing.numThreads? I think they will need to be updated as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we document how the chunkPeriod context parameter interacts with the existing druid.query.chunkPeriod and druid.query.<queryType>.chunkPeriod configuration parameter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually it replaces
druid.query.chunkPeriod, druid.query.<queryType>.chunkPeriod

they are not valid after this pull request. in my experience we found the chunking behavior really needs to be tuned per query [ sometimes based on size of its interval] .

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, can we remove the old configs from docs and code as well, instead of keeping unused config around.



Query Cancellation
------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ public ByteBuffer get()
engine,
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
configSupplier,
new GroupByQueryQueryToolChest(configSupplier, mapper, engine, pool),
new GroupByQueryQueryToolChest(configSupplier, mapper, engine, pool,
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
pool
);

Expand All @@ -105,7 +106,8 @@ public boolean isSingleThreaded()
singleThreadEngine,
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
singleThreadedConfigSupplier,
new GroupByQueryQueryToolChest(singleThreadedConfigSupplier, mapper, singleThreadEngine, pool),
new GroupByQueryQueryToolChest(singleThreadedConfigSupplier, mapper, singleThreadEngine, pool,
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
pool
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public static Collection<?> constructorFeeder() throws IOException
QueryRunnerTestHelper.makeQueryRunners(
new TopNQueryRunnerFactory(
TestQueryRunners.getPool(),
new TopNQueryQueryToolChest(new TopNQueryConfig()),
new TopNQueryQueryToolChest(new TopNQueryConfig(), QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
)
Expand All @@ -79,7 +79,7 @@ public ByteBuffer get()
}
}
),
new TopNQueryQueryToolChest(new TopNQueryConfig()),
new TopNQueryQueryToolChest(new TopNQueryConfig(), QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
)
Expand Down
83 changes: 83 additions & 0 deletions processing/src/main/java/io/druid/query/AsyncQueryRunner.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.query;

import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.guava.LazySequence;
import com.metamx.common.guava.Sequence;

public class AsyncQueryRunner<T> implements QueryRunner<T>
{

private final QueryRunner<T> baseRunner;
private final ListeningExecutorService executor;
private final QueryWatcher queryWatcher;

public AsyncQueryRunner(QueryRunner<T> baseRunner, ExecutorService executor, QueryWatcher queryWatcher) {
this.baseRunner = baseRunner;
this.executor = MoreExecutors.listeningDecorator(executor);
this.queryWatcher = queryWatcher;
}

@Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
{
final int priority = query.getContextPriority(0);
final ListenableFuture<Sequence<T>> future = executor.submit(new AbstractPrioritizedCallable<Sequence<T>>(priority)
{
@Override
public Sequence<T> call() throws Exception
{
//Note: this is assumed that baseRunner does most of the work eagerly on call to the
//run() method and resulting sequence accumulate/yield is fast.
return baseRunner.run(query, responseContext);
}
});
queryWatcher.registerQuery(query, future);

return new LazySequence<>(new Supplier<Sequence<T>>()
{
@Override
public Sequence<T> get()
{
try {
Number timeout = query.getContextValue(QueryContextKeys.TIMEOUT);
if (timeout == null) {
return future.get();
} else {
return future.get(timeout.longValue(), TimeUnit.MILLISECONDS);
}
} catch (ExecutionException | InterruptedException | TimeoutException ex) {
throw Throwables.propagate(ex);
}
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;

import io.druid.granularity.PeriodGranularity;
import io.druid.query.spec.MultipleIntervalSegmentSpec;
import org.joda.time.Interval;
Expand All @@ -31,57 +34,90 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;

/**
*/
public class IntervalChunkingQueryRunner<T> implements QueryRunner<T>
{
private final QueryRunner<T> baseRunner;
private final Period period;

public IntervalChunkingQueryRunner(QueryRunner<T> baseRunner, Period period)
private final QueryToolChest<T, Query<T>> toolChest;
private final ExecutorService executor;
private final QueryWatcher queryWatcher;
private final ServiceEmitter emitter;

public IntervalChunkingQueryRunner(QueryRunner<T> baseRunner, QueryToolChest<T, Query<T>> toolChest,
ExecutorService executor, QueryWatcher queryWatcher, ServiceEmitter emitter)
{
this.baseRunner = baseRunner;
this.period = period;
this.toolChest = toolChest;
this.executor = executor;
this.queryWatcher = queryWatcher;
this.emitter = emitter;
}

@Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
{
if (period.getMillis() == 0) {
final Period chunkPeriod = getChunkPeriod(query);
if (chunkPeriod.toStandardDuration().getMillis() == 0) {
return baseRunner.run(query, responseContext);
}

return Sequences.concat(
FunctionalIterable
.create(query.getIntervals())
.transformCat(
new Function<Interval, Iterable<Interval>>()
List<Interval> chunkIntervals = Lists.newArrayList(FunctionalIterable
.create(query.getIntervals())
.transformCat(
new Function<Interval, Iterable<Interval>>()
{
@Override
public Iterable<Interval> apply(Interval input)
{
return splitInterval(input, chunkPeriod);
}
}
));

if(chunkIntervals.size() <= 1) {
return baseRunner.run(query, responseContext);
}

final QueryRunner<T> finalQueryRunner = new AsyncQueryRunner<T>(
//Note: it is assumed that toolChest.mergeResults(..) gives a query runner that is
//not lazy i.e. it does most of its work on call to run() method
toolChest.mergeResults(
new MetricsEmittingQueryRunner<T>(
emitter,
new Function<Query<T>, ServiceMetricEvent.Builder>()
{
@Override
public Iterable<Interval> apply(Interval input)
public ServiceMetricEvent.Builder apply(Query<T> input)
{
return splitInterval(input);
return toolChest.makeMetricBuilder(input);
}
}
)
.transform(
new Function<Interval, Sequence<T>>()
{
@Override
public Sequence<T> apply(Interval singleInterval)
{
return baseRunner.run(
query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Arrays.asList(singleInterval))),
responseContext
},
baseRunner
).withWaitMeasuredFromNow()),
executor, queryWatcher);

return Sequences.concat(
Lists.newArrayList(FunctionalIterable.create(chunkIntervals).transform(
new Function<Interval, Sequence<T>>()
{
@Override
public Sequence<T> apply(Interval singleInterval)
{
return finalQueryRunner.run(
query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Arrays.asList(singleInterval))),
responseContext
);
}
}
)
);
}
}
))
);
}

private Iterable<Interval> splitInterval(Interval interval)
private Iterable<Interval> splitInterval(Interval interval, Period period)
{
if (interval.getEndMillis() == interval.getStartMillis()) {
return Lists.newArrayList(interval);
Expand All @@ -106,4 +142,9 @@ private Iterable<Interval> splitInterval(Interval interval)

return intervals;
}

private Period getChunkPeriod(Query<T> query) {
String p = query.getContextValue(QueryContextKeys.CHUNK_PERIOD, "P0D");
return Period.parse(p);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.query;

import io.druid.guice.annotations.Processing;

import java.util.concurrent.ExecutorService;

import com.google.inject.Inject;
import com.metamx.emitter.service.ServiceEmitter;

public class IntervalChunkingQueryRunnerDecorator
{
private final ExecutorService executor;
private final QueryWatcher queryWatcher;
private final ServiceEmitter emitter;

@Inject
public IntervalChunkingQueryRunnerDecorator(@Processing ExecutorService executor, QueryWatcher queryWatcher,
ServiceEmitter emitter)
{
this.executor = executor;
this.queryWatcher = queryWatcher;
this.emitter = emitter;
}

public <T> QueryRunner<T> decorate(QueryRunner<T> delegate, QueryToolChest<T, ? extends Query<T>> toolChest)
{
return new IntervalChunkingQueryRunner<T>(delegate, (QueryToolChest<T, Query<T>>)toolChest,
executor, queryWatcher, emitter);
}
}
34 changes: 0 additions & 34 deletions processing/src/main/java/io/druid/query/QueryConfig.java

This file was deleted.

Loading