Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement force push down for nested group by query #5471

Merged
merged 2 commits into from Oct 22, 2018

Conversation

samarthjain
Copy link
Contributor

@gianm @jihoonson - creating a PR as we discussed over the email. The pull request isn't complete by any means. I just wanted to get your feedback on the approach I have taken. For now, I have implemented the force push down feature. Once we have finalized on the over all approach, I will provide a patch for automatic push down in a separate patch. Thanks!

@jihoonson
Copy link
Contributor

Great! I'll take a look soon.

@jihoonson
Copy link
Contributor

jihoonson commented Mar 7, 2018

Hi @samarthjain, I'll do my review today or tomorrow. Before that, I'd like to ask one question. Looks that a query context value decides pushing nested query execution or not. Does this mean all nested subqueries are pushed to historicals if the query is deeply nested?

@samarthjain
Copy link
Contributor Author

@jihoonson - Currently, when the user sets the push nested query execution flag on the outermost query, then the broker executes the entire nested query on the historical nodes. The broker then merges the results returned by the historical nodes as usual except that in this case, the rows returned by historical nodes have the structure of the outermost part of the query.

@jihoonson
Copy link
Contributor

Thanks. It sounds good for the first step. Will leave some comments soon!

Copy link
Contributor

@jihoonson jihoonson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@samarthjain nice work! Left some comments. The current approach looks good to me.

} else {
return groupByStrategy.mergeResults(runner, query, context);
}
}

public static boolean shouldPushDownQuery(GroupByQuery q)
{
return QueryContexts.parseBoolean(q, GroupByQueryConfig.CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY, false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be simplified to return q.getContextBoolean(GroupByQueryConfig.CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY, false); and then https://github.com/druid-io/druid/pull/5471/files#diff-76d2b703319f06a8103ebce8a58ef7fcR221 is not necessary.

// Unset the push down nested query flag so that the historical doesn't erroneously end up pushing down the query itself
pushDownQueryBuilder.overrideContext(ImmutableMap.of(GroupByQueryConfig.CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY, false));
GroupByQuery queryToPushDown = pushDownQueryBuilder.build();
innerQueryBuilder.setQueryToPushDown(queryToPushDown);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you elaborate more on this? setQueryToPushDown() means setting the outer query for the inner query?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was done to make sure that we can pass on the entire nested query over to the historical nodes. The role of the inner most query is only limited to figuring out what segments to query. But what gets executed on the historical nodes is this pushed down query which is nothing but the complete query. I think there is some improvement possible here. Right now, I am intercepting on DirectDruidClient to see whether the pushedDownQuery is present. If so, the DirectDruidClient gets hold of it and sends it over to the historical node.

Relevant snippet in DirectDruidClient: https://github.com/druid-io/druid/pull/5471/files#diff-604c8c11338380d32a71a7621f031d2aR172

@@ -143,7 +156,7 @@ public boolean apply(Row input)

Pair<Grouper<RowBasedKey>, Accumulator<AggregateResult, Row>> pair = RowBasedGrouperHelper.createGrouperAccumulatorPair(
query,
true,
true, //todo: samarth think about this attribute. I think this might be false in our case?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure, but this comment might help.

If isInputRaw is true, transformations such as timestamp truncation and extraction functions have not
been applied to the input rows yet, for example, in a nested query, if an extraction function is being
applied in the outer query to a field of the inner query. This method must apply those transformations.

@@ -102,6 +102,7 @@ public static Builder builder()

private final boolean applyLimitPushDown;
private final Function<Sequence<Row>, Sequence<Row>> postProcessingFn;
private final GroupByQuery pushedDownQuery;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure this should be kept. When is this variable used?

@samarthjain
Copy link
Contributor Author

samarthjain commented Mar 12, 2018

Thanks for the feedback @jihoonson. I will address it in my next commit. Can you point me out as to how should I go about writing tests for this feature? I looked at GroupByQueryRunnerTest and I couldn't find an easy way of mocking the push down behavior. I essentially need a way to mimic that the nested query is getting executed on the historical nodes, and then the outer most query is operating on the results obtained after merging the results returned by the historicals. Would it make sense to instead use Druid SQL and write tests in something like CalciteQueryTest?

@jon-wei
Copy link
Contributor

jon-wei commented Mar 12, 2018

@samarthjain I think you can check out GroupByLimitPushDownMultiNodeMergeTest for an example of testing that kind of merging

@jihoonson
Copy link
Contributor

@jon-wei thanks. It looks a good example to test merging.

@drcrallen
Copy link
Contributor

Can this PR include more description in the master comment about what it is accomplishing and how it is accomplishing it?

@samarthjain
Copy link
Contributor Author

@jihoonson - I have updated pull request with review comments, did some clean up and added tests.

@samarthjain
Copy link
Contributor Author

The way nested query execution is implemented today, it executes the inner most query on the historical nodes with the outer queries being executed on the broker node. This can get problematic when the inner query groups using a high cardinality dimension, returning too many records for broker node to handle. One of the options that we have been internally testing and exploring is the capability to push down the complete nested query to the historical nodes. Each historical node then will execute the nested query primarily dealing with the segments it owns for that query. Because the number of records returned by each historical node would potentially be much smaller in this case, it would be less intensive for the broker to perform the final merge and aggregation. The broker though won't need to perform any more dimension or segment level filtering since it will be taken care of at the historical nodes itself. Note that this way of distributing the aggregation to the historical nodes doesn't always return the same results as the final aggregation getting done on the broker node. However, there is a good set of cases (for ex - aggregating on dimensions that are used for hashing during ingestion) where this kind of push down logic will return the right results. I can get into this into more detail but the general idea was to leave the onus on the user to figure out if their data layout allows for this kind of push down.

This implementation provides user a way of forcing nested query execution through a query context variable. The next cut will focus on doing this automatically under the following conditions (credit to @gianm ) for clearly articulating the following:

  1. The groupBy query granularity is equal to, or finer than, segment granularity;

and either:

2a) A time chunk uses HashBasedNumberedShardSpec, partitionDimensions is nonempty, the grouping dimension set contains all of the shard partitionDimensions, and there are no "extension" partitions (partitions with partitionNum >= partitions, which are created by ingest tasks that append data)

or:

2b) A time chunk uses SingleDimensionShardSpec and the grouping dimension set contains the shard dimension.

If Druid detects this it should push down the query automatically for that time chunk. There will be situations where the query can be pushed down for some time chunks but not others (for example: consider a data pipeline that loads data unpartitioned in realtime, and later has a reindexing job to partition historical data by some useful dimension). In this case, ideally the broker should be capable of pushing down the query for the time chunks where it can be correctly pushed down, and not pushing it down for others.

@jihoonson
Copy link
Contributor

@samarthjain thanks. I'll take another look.

@jihoonson
Copy link
Contributor

@samarthjain we are suffering from pretty frequent CI failures. If the CI for your PR fails, you first need to make sure the failure relates to any changes in your PR. This can be checked by running unit tests locally.

Usually the memory configuration for CI shouldn't be a problem. If you found something wrong in the CI configuration, please make another PR to fix it.

@samarthjain
Copy link
Contributor Author

@jihoonson - sorry for the late reply. I have cleaned up code, removed pom changes and added an integration test. I wanted to check with you how is the integration test triggered? How is the data needed for the test loaded? I looked at existing integration tests and couldn't figure out where that magic happens.

@samarthjain
Copy link
Contributor Author

samarthjain commented Apr 5, 2018

[ERROR] before(io.druid.tests.query.ITNestedQueryPushDownTest) Time elapsed: 403.943 s <<< FAILURE! io.druid.java.util.common.ISE: Max number of retries[10] exceeded for Task[wikipedia segment load]. Failing. at io.druid.tests.query.ITNestedQueryPushDownTest.before(ITNestedQueryPushDownTest.java:47)

Looks like the framework is not able to locate wikipedia data source which the new test needs. @jihoonson, the test passes for me locally after loading up the wikipedia datasource as mentioned on http://druid.io/docs/latest/tutorials/quickstart.html. Would be great if you can let me know how to get load new data for the IT tests.

@samarthjain samarthjain changed the title First cut at implementing push down nested query execution Implement force push down for nested group by query Apr 9, 2018
@samarthjain
Copy link
Contributor Author

@jihoonson - any insights on how to add and execute new integration tests for CI?

@jihoonson
Copy link
Contributor

@samarthjain I'll take a look today. Sorry for late review.

Copy link
Contributor

@jihoonson jihoonson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@samarthjain thanks for adding tests especially an integration test! I left some comments.
Besides, I have three more comments for overall implementation design.

  • If I understand correctly, once forceNestedQueryPushDown is set to true, the inner most query is passed from GroupByQueryQueryToolChest to DirectDruidClient to find which historicals should receive the query, and then that the pushed down nested query is sent from DirectDruidClient. I think this kind of value hijacking can cause many potential bugs. Instead, I suggest to add an interface to Query to find the dataSource of the inner-most query.
  • You added a flag like wasQueryPushedDown to existing methods to reuse them. As a result, some methods have too many functionalities and I think it makes difficult to track down the query execution path. For example, GroupByRowProcessor.process() is supposed to be called only when processing sub queries, but now it looks to be called when merging sub queries as well. I think it was for avoiding code duplication, but it can be improved by splitting them and extracting common code paths into utility methods.
  • A groupBy v2 query requires to get a resource before query execution. We currently check there are enough number of merge buffers to execute a given query. Since historicals have processed only the inner-most query so far, they have always required a single merge buffer. However, they can process nested queries after this patch, resource preservation should be changed as well. Please see here for how the number of merge buffers is calculated for brokers.

@@ -240,7 +240,7 @@
}
}

static <T> boolean parseBoolean(Query<T> query, String key, boolean defaultValue)
public static <T> boolean parseBoolean(Query<T> query, String key, boolean defaultValue)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't supposed to be used outside of this class. Please consider #5471 (comment).

new Callable<Boolean>()
{
@Override
public Boolean call() throws Exception
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exception is not thrown.

{
return coordinatorClient.areSegmentsLoaded(WIKIPEDIA_DATA_SOURCE);
}
}, "wikipedia segment load"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please break the line before "wikipedia segment load".

{
// ensure that wikipedia segments are loaded completely
RetryUtil.retryUntilTrue(
new Callable<Boolean>()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this Callable can be simplified into () -> coordinatorClient.areSegmentsLoaded(WIKIPEDIA_DATA_SOURCE).

import io.druid.segment.DimensionSelector;
import io.druid.segment.column.ValueType;

public class PushDownQueryDimensionSpec implements DimensionSpec
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you elaborate on how this class is used? Looks like it does nothing except forwarding request to the underlying delegate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dimension name for this dimension spec is the the output name of the delegate. This dimension spec is only used in case of nested query push down.
@Override public String getDimension() { // the dimension name is same as the output name. return delegate.getOutputName(); }

).withOverriddenContext(
ImmutableMap.<String, Object>of(
// the having spec shouldn't be passed down, so we need to convey the existing limit push down status
GroupByQueryConfig.CTX_KEY_APPLY_LIMIT_PUSH_DOWN, query.isApplyLimitPushDown(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you elaborate more on this? Why is query's limitPushDown passed even though having spec is always not passed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

subqueryContext.putAll(((QueryDataSource) dataSource).getQuery().getContext());
pushDownQuery = groupByStrategy.supportsNestedQueryPushDown() && shouldPushDownQuery(query);
if (pushDownQuery) {
return getResultsOnPushDown(groupByStrategy, query, resource, runner, context);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, this kind of return statement in the middle of a method might make readers difficult to understand. Suggest to split this method into several smaller methods.

@@ -108,4 +108,9 @@ public GroupByQueryRunner(Segment segment, final GroupByStrategySelector strateg
return strategySelector.strategize((GroupByQuery) query).process((GroupByQuery) query, adapter);
}
}

public GroupByStrategySelector getStrategySelector()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please annotate with @VisibleForTesting.

subqueryResult,
subquery
);
return groupByStrategy.processSubqueryResult(newSubquery, query, resource, finalizingResults, pushDownQuery);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pushDownQuery is always false.

final GroupByQueryConfig querySpecificConfig = config.withOverrides(query);

final AggregatorFactory[] aggregatorFactories = new AggregatorFactory[query.getAggregatorSpecs().size()];
for (int i = 0; i < query.getAggregatorSpecs().size(); i++) {
aggregatorFactories[i] = query.getAggregatorSpecs().get(i);
AggregatorFactory af = query.getAggregatorSpecs().get(i);
aggregatorFactories[i] = wasQueryPushedDown ? af.getCombiningFactory() : af;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the combining factory used here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of nested query push down, the historicals are going to return aggregated results and the broker's job is to combine these aggregated results - hence the use of combining aggregator factory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

Can we this kind of query rewriting for dimensions and aggregatorFactories in GroupByQueryQueryToolChect.getResultsOnPushDown()? It would be better because we can put necessary stuffs for nestedQueryPushDown together when isNestedQueryPushDown() is checked.

@jihoonson
Copy link
Contributor

The integration test fails in my laptop as well. Also, when I tested the below query with my local cluster, I met this error.

{
  "query" : "select sum(cnt) from ( select cityName, sum(cnt) cnt from ( select countryName, cityName, count(*) cnt from druid.wikiticker where countryName like '%a%' group by cityName, countryName) t1 group by cityName ) t2",
  "context": {
    "forceNestedQueryPushDown": true
  }
}
2018-04-10T05:48:07,642 ERROR [qtp386040589-63[groupBy_[wikiticker]_02d4a8bd-30db-40a9-9520-ae825ac508b2]] io.druid.server.QueryResource - Exception handling request: {class=io.druid.server.QueryResource, exceptionType=class java.lang.UnsupportedOperationException, exceptionMessage=data source type 'io.druid.query.QueryDataSource' unsupported, exception=java.lang.UnsupportedOperationException: data source type 'io.druid.query.QueryDataSource' unsupported, query=GroupByQuery{dataSource='GroupByQuery{dataSource='GroupByQuery{dataSource='wikiticker', querySegmentSpec=MultipleIntervalSegmentSpec{intervals=[-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z]}, virtualColumns=[], limitSpec=NoopLimitSpec, dimFilter=countryName LIKE '%a%', granularity=AllGranularity, dimensions=[DefaultDimensionSpec{dimension='cityName', outputName='d0', outputType='STRING'}, DefaultDimensionSpec{dimension='countryName', outputName='d1', outputType='STRING'}], aggregatorSpecs=[CountAggregatorFactory{name='a0'}], postAggregatorSpecs=[], havingSpec=null}', querySegmentSpec=MultipleIntervalSegmentSpec{intervals=[-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z]}, virtualColumns=[], limitSpec=NoopLimitSpec, dimFilter=null, granularity=AllGranularity, dimensions=[DefaultDimensionSpec{dimension='d0', outputName='_d0', outputType='STRING'}], aggregatorSpecs=[LongSumAggregatorFactory{fieldName='a0', expression='null', name='_a0'}], postAggregatorSpecs=[], havingSpec=null}', querySegmentSpec=MultipleSpecificSegmentSpec{descriptors=[SegmentDescriptor{interval=2015-09-12T00:00:00.000Z/2015-09-12T01:00:00.000Z, version='2018-04-10T05:35:49.715Z', partitionNumber=0}, SegmentDescriptor{interval=2015-09-12T01:00:00.000Z/2015-09-12T02:00:00.000Z, version='2018-04-10T05:35:49.715Z', partitionNumber=0}, SegmentDescriptor{interval=2015-09-12T02:00:00.000Z/2015-09-12T03:00:00.000Z, version='2018-04-10T05:35:49.715Z', partitionNumber=0}, SegmentDescriptor{interval=2015-09-12T03:00:00.000Z/2015-09-12T04:00:00.000Z, version='2018-04-10T05:35:49.715Z', partitionNumber=0}, SegmentDescriptor{interval=2015-09-12T04:00:00.000Z/2015-09-12T05:00:00.000Z, version='2018-04-10T05:35:49.715Z', partitionNumber=0}, SegmentDescriptor{interval=2015-09-12T05:00:00.000Z/2015-09-12T06:00:00.000Z, version='2018-04-10T05:35:49.715Z', partitionNumber=0}, SegmentDescriptor{interval=2015-09-12T06:00:00.000Z/2015-09-12T07:00:00.000Z, version='2018-04-10T05:35:49.715Z', partitionNumber=0}, SegmentDescriptor{interval=2015-09-12T07:00:00.000Z/2015-09-12T08:00:00.000Z, version='2018-04-10T05:35:49.715Z', partitionNumber=0}, SegmentDescriptor{interval=2015-09-12T08:00:00.000Z/2015-09-12T09:00:00.000Z, version='2018-04-10T05:35:49.715Z', partitionNumber=0}, SegmentDescriptor{interval=2015-09-12T09:00:00.000Z/2015-09-12T10:00:00.000Z, version='2018-04-10T05:35:49.715Z', partitionNumber=0}, SegmentDescriptor{interval=2015-09-12T10:00:00.000Z/2015-09-12T11:00:00.000Z, version='2018-04-10T05:35:49.715Z', partitionNumber=0}, SegmentDescriptor{interval=2015-09-12T11:00:00.000Z/2015-09-12T12:00:00.000Z, version='2018-04-10T05:35:49.715Z', partitionNumber=0}, SegmentDescriptor{interval=2015-09-12T12:00:00.000Z/2015-09-12T13:00:00.000Z, version='2018-04-10T05:35:49.715Z', partitionNumber=0}, SegmentDescriptor{interval=2015-09-12T13:00:00.000Z/2015-09-12T14:00:00.000Z, version='2018-04-10T05:35:49.715Z', partitionNumber=0}, SegmentDescriptor{interval=2015-09-12T14:00:00.000Z/2015-09-12T15:00:00.000Z, version='2018-04-10T05:35:49.715Z', partitionNumber=0}, SegmentDescriptor{interval=2015-09-12T15:00:00.000Z/2015-09-12T16:00:00.000Z, version='2018-04-10T05:35:49.715Z', partitionNumber=0}, SegmentDescriptor{interval=2015-09-12T16:00:00.000Z/2015-09-12T17:00:00.000Z, version='2018-04-10T05:35:49.715Z', partitionNumber=0}, SegmentDescriptor{interval=2015-09-12T17:00:00.000Z/2015-09-12T18:00:00.000Z, version='2018-04-10T05:35:49.715Z', partitionNumber=0}, SegmentDescriptor{interval=2015-09-12T18:00:00.000Z/2015-09-12T19:00:00.000Z, version='2018-04-10T05:35:49.715Z', partitionNumber=0}, SegmentDescriptor{interval=2015-09-12T19:00:00.000Z/2015-09-12T20:00:00.000Z, version='2018-04-10T05:35:49.715Z', partitionNumber=0}, SegmentDescriptor{interval=2015-09-12T20:00:00.000Z/2015-09-12T21:00:00.000Z, version='2018-04-10T05:35:49.715Z', partitionNumber=0}, SegmentDescriptor{interval=2015-09-12T21:00:00.000Z/2015-09-12T22:00:00.000Z, version='2018-04-10T05:35:49.715Z', partitionNumber=0}, SegmentDescriptor{interval=2015-09-12T22:00:00.000Z/2015-09-12T23:00:00.000Z, version='2018-04-10T05:35:49.715Z', partitionNumber=0}, SegmentDescriptor{interval=2015-09-12T23:00:00.000Z/2015-09-13T00:00:00.000Z, version='2018-04-10T05:35:49.715Z', partitionNumber=0}]}, virtualColumns=[], limitSpec=NoopLimitSpec, dimFilter=null, granularity=AllGranularity, dimensions=[], aggregatorSpecs=[LongSumAggregatorFactory{fieldName='_a0', expression='null', name='a0'}], postAggregatorSpecs=[], havingSpec=null}, peer=10.0.0.52}
java.lang.UnsupportedOperationException: data source type 'io.druid.query.QueryDataSource' unsupported
        at io.druid.server.coordination.ServerManager.getQueryRunnerForIntervals(ServerManager.java:130) ~[druid-server-0.13.0-SNAPSHOT.jar:0.13.0-SNAPSHOT]
        at io.druid.query.spec.MultipleIntervalSegmentSpec.lookup(MultipleIntervalSegmentSpec.java:57) ~[druid-processing-0.13.0-SNAPSHOT.jar:0.13.0-SNAPSHOT]
        at io.druid.query.BaseQuery.getRunner(BaseQuery.java:113) ~[druid-processing-0.13.0-SNAPSHOT.jar:0.13.0-SNAPSHOT]
        at io.druid.query.QueryPlus.run(QueryPlus.java:145) ~[druid-processing-0.13.0-SNAPSHOT.jar:0.13.0-SNAPSHOT]
        at io.druid.server.QueryLifecycle.execute(QueryLifecycle.java:257) ~[druid-server-0.13.0-SNAPSHOT.jar:0.13.0-SNAPSHOT]
        at io.druid.server.QueryResource.doPost(QueryResource.java:184) [druid-server-0.13.0-SNAPSHOT.jar:0.13.0-SNAPSHOT]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_161]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_161]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_161]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_161]
        at com.sun.jersey.spi.container.JavaMethodInvokerFactory$1.invoke(JavaMethodInvokerFactory.java:60) [jersey-server-1.19.3.jar:1.19.3]
        at com.sun.jersey.server.impl.model.method.dispatch.AbstractResourceMethodDispatchProvider$ResponseOutInvoker._dispatch(AbstractResourceMethodDispatchProvider.java:205) [jersey-server-1.19.3.jar:1.19.3]
        at com.sun.jersey.server.impl.model.method.dispatch.ResourceJavaMethodDispatcher.dispatch(ResourceJavaMethodDispatcher.java:75) [jersey-server-1.19.3.jar:1.19.3]
        at com.sun.jersey.server.impl.uri.rules.HttpMethodRule.accept(HttpMethodRule.java:302) [jersey-server-1.19.3.jar:1.19.3]
        at com.sun.jersey.server.impl.uri.rules.ResourceClassRule.accept(ResourceClassRule.java:108) [jersey-server-1.19.3.jar:1.19.3]
        at com.sun.jersey.server.impl.uri.rules.RightHandPathRule.accept(RightHandPathRule.java:147) [jersey-server-1.19.3.jar:1.19.3]
        at com.sun.jersey.server.impl.uri.rules.RootResourceClassesRule.accept(RootResourceClassesRule.java:84) [jersey-server-1.19.3.jar:1.19.3]
        at com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1542) [jersey-server-1.19.3.jar:1.19.3]
        at com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1473) [jersey-server-1.19.3.jar:1.19.3]
        at com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1419) [jersey-server-1.19.3.jar:1.19.3]
        at com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1409) [jersey-server-1.19.3.jar:1.19.3]
        at com.sun.jersey.spi.container.servlet.WebComponent.service(WebComponent.java:409) [jersey-servlet-1.19.3.jar:1.19.3]
        at com.sun.jersey.spi.container.servlet.ServletContainer.service(ServletContainer.java:558) [jersey-servlet-1.19.3.jar:1.19.3]
        at com.sun.jersey.spi.container.servlet.ServletContainer.service(ServletContainer.java:733) [jersey-servlet-1.19.3.jar:1.19.3]
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:790) [javax.servlet-api-3.1.0.jar:3.1.0]
        at com.google.inject.servlet.ServletDefinition.doServiceImpl(ServletDefinition.java:286) [guice-servlet-4.1.0.jar:?]
        at com.google.inject.servlet.ServletDefinition.doService(ServletDefinition.java:276) [guice-servlet-4.1.0.jar:?]
        at com.google.inject.servlet.ServletDefinition.service(ServletDefinition.java:181) [guice-servlet-4.1.0.jar:?]
        at com.google.inject.servlet.ManagedServletPipeline.service(ManagedServletPipeline.java:91) [guice-servlet-4.1.0.jar:?]
        at com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:85) [guice-servlet-4.1.0.jar:?]
        at com.google.inject.servlet.ManagedFilterPipeline.dispatch(ManagedFilterPipeline.java:120) [guice-servlet-4.1.0.jar:?]
        at com.google.inject.servlet.GuiceFilter.doFilter(GuiceFilter.java:135) [guice-servlet-4.1.0.jar:?]
        at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759) [jetty-servlet-9.3.19.v20170502.jar:9.3.19.v20170502]
        at io.druid.server.security.PreResponseAuthorizationCheckFilter.doFilter(PreResponseAuthorizationCheckFilter.java:84) [druid-server-0.13.0-SNAPSHOT.jar:0.13.0-SNAPSHOT]
        at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759) [jetty-servlet-9.3.19.v20170502.jar:9.3.19.v20170502]
        at io.druid.server.security.AllowAllAuthenticator$1.doFilter(AllowAllAuthenticator.java:85) [druid-server-0.13.0-SNAPSHOT.jar:0.13.0-SNAPSHOT]
        at io.druid.server.security.AuthenticationWrappingFilter.doFilter(AuthenticationWrappingFilter.java:60) [druid-server-0.13.0-SNAPSHOT.jar:0.13.0-SNAPSHOT]
        at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759) [jetty-servlet-9.3.19.v20170502.jar:9.3.19.v20170502]
        at io.druid.server.security.SecuritySanityCheckFilter.doFilter(SecuritySanityCheckFilter.java:86) [druid-server-0.13.0-SNAPSHOT.jar:0.13.0-SNAPSHOT]
        at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759) [jetty-servlet-9.3.19.v20170502.jar:9.3.19.v20170502]
        at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:582) [jetty-servlet-9.3.19.v20170502.jar:9.3.19.v20170502]
        at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:224) [jetty-server-9.3.19.v20170502.jar:9.3.19.v20170502]
        at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1180) [jetty-server-9.3.19.v20170502.jar:9.3.19.v20170502]
        at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:512) [jetty-servlet-9.3.19.v20170502.jar:9.3.19.v20170502]
        at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185) [jetty-server-9.3.19.v20170502.jar:9.3.19.v20170502]
        at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112) [jetty-server-9.3.19.v20170502.jar:9.3.19.v20170502]
        at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) [jetty-server-9.3.19.v20170502.jar:9.3.19.v20170502]
        at org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:493) [jetty-server-9.3.19.v20170502.jar:9.3.19.v20170502]
        at org.eclipse.jetty.server.handler.HandlerList.handle(HandlerList.java:52) [jetty-server-9.3.19.v20170502.jar:9.3.19.v20170502]
        at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:169) [jetty-server-9.3.19.v20170502.jar:9.3.19.v20170502]
        at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134) [jetty-server-9.3.19.v20170502.jar:9.3.19.v20170502]
        at org.eclipse.jetty.server.Server.handle(Server.java:534) [jetty-server-9.3.19.v20170502.jar:9.3.19.v20170502]
        at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:320) [jetty-server-9.3.19.v20170502.jar:9.3.19.v20170502]
        at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251) [jetty-server-9.3.19.v20170502.jar:9.3.19.v20170502]
        at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:283) [jetty-io-9.3.19.v20170502.jar:9.3.19.v20170502]
        at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:108) [jetty-io-9.3.19.v20170502.jar:9.3.19.v20170502]
        at org.eclipse.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:93) [jetty-io-9.3.19.v20170502.jar:9.3.19.v20170502]
        at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.executeProduceConsume(ExecuteProduceConsume.java:303) [jetty-util-9.3.19.v20170502.jar:9.3.19.v20170502]
        at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:148) [jetty-util-9.3.19.v20170502.jar:9.3.19.v20170502]
        at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:136) [jetty-util-9.3.19.v20170502.jar:9.3.19.v20170502]
        at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:671) [jetty-util-9.3.19.v20170502.jar:9.3.19.v20170502]
        at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:589) [jetty-util-9.3.19.v20170502.jar:9.3.19.v20170502]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_161]

@samarthjain
Copy link
Contributor Author

Thanks for the detailed feedback, @jihoonson . I will review it and reply by tonight.

@samarthjain
Copy link
Contributor Author

samarthjain commented Apr 11, 2018

GroupByRowProcessor.process() is supposed to be called only when processing sub queries, but now it looks to be called when merging sub queries as well. I think it was for avoiding code duplication, but it can be improved by splitting them and extracting common code paths into utility methods.

The process method isn't doing any additional merges. The only change is to make it cognizant of the fact that the sub-query sequence it is processing in case of forcePushDown is now a result of running the pushed down nested query instead.

A groupBy v2 query requires to get a resource before query execution. We currently check there are enough number of merge buffers to execute a given query. Since historicals have processed only the inner-most query so far, they have always required a single merge buffer. However, they can process nested queries after this patch, resource preservation should be changed as well. Please see here for how the number of merge buffers is calculated for brokers.

Good point. I will make sure that historicals have same number of merge buffers as broker. Although from the first look at the code, it seems like it it should, but I will make sure and hopefully add a test for the same too.

If I understand correctly, once forceNestedQueryPushDown is set to true, the inner most query is passed from GroupByQueryQueryToolChest to DirectDruidClient to find which historicals should receive the query, and then that the pushed down nested query is sent from DirectDruidClient. I think this kind of value hijacking can cause many potential bugs. Instead, I suggest to add an interface to Query to find the dataSource of the inner-most query.

Thanks. Let me see how can I make this cleaner. I am not the biggest fan of the approach I took, either.

@samarthjain
Copy link
Contributor Author

The integration test fails in my laptop as well. Also, when I tested the below query with my local cluster, I met this error.

Did you update the jar on the historicals too?

@jihoonson
Copy link
Contributor

@samarthjain yes, I tested in my local laptop and used the jar of the same version for all modules.

@samarthjain
Copy link
Contributor Author

Addressed the review comments you left, @jihoonson . I was able to get rid of the hacky approach I took for getting hold of the complete nested query in DirectDruidClient by using it directly higher in the stack in GroupByQueryToolChest and GroupByStrategyV2. Also fixed the failure you ran into when using a multi-level nested query. To run the integration test on my laptop, I had to add a config file and supply it's path in the ConfigFileConfigProvider. The contents of the config file looked like this:
{ "broker_host": "127.0.0.1", "broker_port": "8082", "coordinator_host": "127.0.0.1", "coordinator_port": "8081", "middlemanager_host": "127.0.0.1", "middlemanager_port":"8090", "zookeeper_hosts": "127.0.0.1:2181" }

I also had to change IntegrationTestingConfigProvider to use the ConfigFileConfigProvider and not DockerConfigProvider.

@nishantmonu51 - would you happen to know how can I get my newly added integration test to run automatically in the pre-checkin builds?

@jihoonson
Copy link
Contributor

@samarthjain thanks for the update! I'll take another look.

To run the integration test on my laptop, I had to add a config file and supply it's path in the ConfigFileConfigProvider. The contents of the config file looked like this:
{ "broker_host": "127.0.0.1", "broker_port": "8082", "coordinator_host": "127.0.0.1", "coordinator_port": "8081", "middlemanager_host": "127.0.0.1", "middlemanager_port":"8090", "zookeeper_hosts": "127.0.0.1:2181" }

Would you elaborate on why this configuration is needed? It's supposed to get addresses automatically in the integration tests.

@nishantmonu51 - would you happen to know how can I get my newly added integration test to run automatically in the pre-checkin builds?

What do you mean by this? If you add an integration test, it should be run automatically on the travis. You can check it by checking the log on travis or running the integration test on your own.

@jihoonson
Copy link
Contributor

Also, please check the CI failure.

Results :
Failed tests: 
  GroupByLimitPushDownInsufficientBufferTest.testPartialLimitPushDownMergeForceAggs:605 expected:<MapBasedRow{timestamp=1970-01-01T00:00:00.000Z, event={dimA=mango, metA=190}}> but was:<MapBasedRow{timestamp=1970-01-01T00:00:00.000Z, event={dimA=zebra, metA=180}}>
Tests in error: 
  GroupByQueryRunnerTest.testSubqueryWithFirstLast:6177 » ClassCast java.lang.Lo...
  GroupByQueryRunnerTest.testSubqueryWithFirstLast:6177 » ClassCast java.lang.Lo...
  GroupByQueryRunnerTest.testSubqueryWithFirstLast:6177 » ClassCast java.lang.Lo...
  GroupByQueryRunnerTest.testSubqueryWithFirstLast:6177 » ClassCast java.lang.Lo...
  GroupByQueryRunnerTest.testSubqueryWithFirstLast:6177 » ClassCast java.lang.Lo...
  GroupByQueryRunnerTest.testSubqueryWithFirstLast:6177 » ClassCast java.lang.Lo...
  GroupByQueryRunnerTest.testSubqueryWithFirstLast:6177 » ClassCast java.lang.Lo...
  GroupByQueryRunnerTest.testSubqueryWithFirstLast:6177 » ClassCast java.lang.Lo...
  GroupByQueryRunnerTest.testSubqueryWithFirstLast:6177 » ClassCast java.lang.Lo...
  GroupByQueryRunnerTest.testSubqueryWithFirstLast:6177 » ClassCast java.lang.Lo...
  GroupByQueryRunnerTest.testSubqueryWithFirstLast:6177 » ClassCast java.lang.Lo...
  GroupByQueryRunnerTest.testSubqueryWithFirstLast:6177 » ClassCast java.lang.Lo...
  GroupByQueryRunnerTest.testSubqueryWithFirstLast:6177 » ClassCast java.lang.Lo...
  GroupByQueryRunnerTest.testSubqueryWithFirstLast:6177 » ClassCast java.lang.Lo...
  GroupByQueryRunnerTest.testSubqueryWithFirstLast:6177 » ClassCast java.lang.Lo...
  GroupByQueryRunnerTest.testSubqueryWithFirstLast:6177 » ClassCast java.lang.Lo...
  GroupByQueryRunnerTest.testSubqueryWithFirstLast:6177 » ClassCast java.lang.Lo...
  GroupByQueryRunnerTest.testSubqueryWithFirstLast:6177 » ClassCast java.lang.Lo...
  GroupByQueryRunnerTest.testSubqueryWithFirstLast:6177 » ClassCast java.lang.Lo...
  GroupByQueryRunnerTest.testSubqueryWithFirstLast:6177 » ClassCast java.lang.Lo...
Tests run: 83353, Failures: 1, Errors: 20, Skipped: 62

@samarthjain
Copy link
Contributor Author

Not sure why the travis build is complaining. Seems to be building fine locally. @jihoonson - is there a way for you to retry the build?

@samarthjain
Copy link
Contributor Author

Ok, looks like there was a rebase issue with my branch. @jihoonson - would be awesome if you can take a look when you get a chance.

@samarthjain
Copy link
Contributor Author

@jihoonson - I see that a 0.13.0 release is being planned. It would be great if this feature can be added to the 0.13.0 release.

@jihoonson
Copy link
Contributor

Hi @samarthjain! Thank you for updating this PR. I'll take a look soon, probably at least in next week. Unfortunately, the feature for 0.13.0 is frozen, but I'm scheduling this to 0.13.1. Thanks!

@fjy fjy added this to the 0.13.1 milestone Oct 10, 2018
Copy link
Contributor

@jihoonson jihoonson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@samarthjain I left probably my last comments. Please take a look. Thanks!

final AggregateResult retVal = filteredSequence.accumulate(AggregateResult.ok(), accumulator);
final AggregateResult retVal = wasQueryPushedDown
? rows.accumulate(AggregateResult.ok(), accumulator)
: filteredSequence.accumulate(AggregateResult.ok(), accumulator);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this because the filters are already processed when wasQueryPushedDown is true? Then, please add a comment about it here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I think we can clarify this part a bit more. For example, all filter-related variables like filteredSequence, filterMatcher, filter, etc don't have to be initialized. Would you please export this part as a separate method and skip calling it if wasQueryPushedDown is true?

);
GroupByQuery rewrittenQuery = ((GroupByQueryQueryToolChest) toolChest).rewriteNestedQueryForPushDown(nestedQuery);
// Broker executes this code and hence has
return strategy.applyPostProcessing(strategy.processSubqueryResult(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we call GroupByQueryQueryToolChest.mergeResults() instead of calling this directly? I think it would be better because the unit test would execute the same logic with the production code.

@samarthjain
Copy link
Contributor Author

Thanks for your patient reviews, @jihoonson. I have updated the PR with the requested changes. Travis build seems to be timing out.

@samarthjain
Copy link
Contributor Author

@jihoonson - does this look good to merge now?

Copy link
Contributor

@jihoonson jihoonson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@samarthjain thanks for the fix! I restarted Travis. +1 after CI.

@samarthjain
Copy link
Contributor Author

CI looks good. Thanks for restarting it and all your reviews, @jihoonson. Let me know if you need to me to squash the commits.

@jihoonson
Copy link
Contributor

@samarthjain nice! You don't have to squash commits in most cases. I'll merge this shortly.

@jihoonson jihoonson merged commit 359576a into apache:master Oct 22, 2018
Assert.assertEquals(outputNameAgg, rewrittenQuery.getAggregatorSpecs().get(0).getName());
}

private List<QueryRunner<Row>> getRunner1()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused method

return runners;
}

private List<QueryRunner<Row>> getRunner2()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused method

gianm added a commit to gianm/druid that referenced this pull request Jan 21, 2020
Builds on apache#9111 and implements the datasource analysis mentioned in apache#8728. Still can't
handle join datasources, but we're a step closer.

Join-related DataSource types:

1) Add "join", "lookup", and "inline" datasources.
2) Add "getChildren" and "withChildren" methods to DataSource, which will be used
   in the future for query rewriting (e.g. inlining of subqueries).

DataSource analysis functionality:

1) Add DataSourceAnalysis class, which breaks down datasources into three components:
   outer queries, a base datasource (left-most of the highest level left-leaning join
   tree), and other joined-in leaf datasources (the right-hand branches of the
   left-leaning join tree).
2) Add "isConcrete", "isGlobal", and "isCacheable" methods to DataSource in order to
   support analysis.
3) Use the DataSourceAnalysis methods throughout the query handling stack, replacing
   various ad-hoc approaches. Most of the interesting changes are in
   ClientQuerySegmentWalker (brokers), ServerManager (historicals), and
   SinkQuerySegmentWalker (indexing tasks).

Other notes:

1) Changed TimelineServerView to return an Optional timeline, which I thought made
   the analysis changes cleaner to implement.
2) Renamed DataSource#getNames to DataSource#getTableNames, which I think is clearer.
   Also, made it a Set, so implementations don't need to worry about duplicates.
3) Added QueryToolChest#canPerformSubquery, which is now used by query entry points to
   determine whether it is safe to pass a subquery dataSource to the query toolchest.
   Fixes an issue introduced in apache#5471 where subqueries under non-groupBy-typed queries
   were silently ignored, since neither the query entry point nor the toolchest did
   anything special with them.
4) The addition of "isCacheable" should work around apache#8713, since UnionDataSource now
   returns false for cacheability.
gianm added a commit to gianm/druid that referenced this pull request Jan 22, 2020
Builds on apache#9235, using the datasource analysis functionality to replace various ad-hoc
approaches. The most interesting changes are in ClientQuerySegmentWalker (brokers),
ServerManager (historicals), and SinkQuerySegmentWalker (indexing tasks).

Other changes related to improving how we analyze queries:

1) Changes TimelineServerView to return an Optional timeline, which I thought made
   the analysis changes cleaner to implement.
2) Added QueryToolChest#canPerformSubquery, which is now used by query entry points to
   determine whether it is safe to pass a subquery dataSource to the query toolchest.
   Fixes an issue introduced in apache#5471 where subqueries under non-groupBy-typed queries
   were silently ignored, since neither the query entry point nor the toolchest did
   anything special with them.
3) Removes the QueryPlus.withQuerySegmentSpec method, which was mostly being used in
   error-prone ways (ignoring any potential subqueries, and not verifying that the
   underlying data source is actually a table). Replaces with a new function,
   Queries.withSpecificSegments, that includes sanity checks.
gianm added a commit that referenced this pull request Jan 23, 2020
Builds on #9235, using the datasource analysis functionality to replace various ad-hoc
approaches. The most interesting changes are in ClientQuerySegmentWalker (brokers),
ServerManager (historicals), and SinkQuerySegmentWalker (indexing tasks).

Other changes related to improving how we analyze queries:

1) Changes TimelineServerView to return an Optional timeline, which I thought made
   the analysis changes cleaner to implement.
2) Added QueryToolChest#canPerformSubquery, which is now used by query entry points to
   determine whether it is safe to pass a subquery dataSource to the query toolchest.
   Fixes an issue introduced in #5471 where subqueries under non-groupBy-typed queries
   were silently ignored, since neither the query entry point nor the toolchest did
   anything special with them.
3) Removes the QueryPlus.withQuerySegmentSpec method, which was mostly being used in
   error-prone ways (ignoring any potential subqueries, and not verifying that the
   underlying data source is actually a table). Replaces with a new function,
   Queries.withSpecificSegments, that includes sanity checks.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants