Skip to content

Commit

Permalink
Broker: Add ability to inline subqueries.
Browse files Browse the repository at this point in the history
The main changes:

- ClientQuerySegmentWalker: Add ability to inline queries.
- Query: Add "getSubQueryId" and "withSubQueryId" methods.
- QueryMetrics: Add "subQueryId" dimension.
- ServerConfig: Add new "maxSubqueryRows" parameter, which is used by
  ClientQuerySegmentWalker to limit how many rows can be inlined per
  query.
- IndexedTableJoinMatcher: Allow creating keys on top of unknown types,
  by assuming they are strings. This is useful because not all types are
  known for fields in query results.
- InlineDataSource: Store RowSignature rather than component parts. Add
  more zealous "equals" and "hashCode" methods to ease testing.
- Moved QuerySegmentWalker test code from CalciteTests and
  SpecificSegmentsQueryWalker in druid-sql to QueryStackTests in
  druid-server. Use this to spin up a new ClientQuerySegmentWalkerTest.
  • Loading branch information
gianm committed Mar 18, 2020
1 parent 4c620b8 commit 91ea2e9
Show file tree
Hide file tree
Showing 50 changed files with 2,099 additions and 732 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@
import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
import org.apache.druid.benchmark.datagen.SegmentGenerator;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.NoopEscalator;
Expand Down Expand Up @@ -183,20 +183,19 @@ public void setup()
log.info("Starting benchmark setup using cacheDir[%s], rows[%,d].", segmentGenerator.getCacheDir(), rowsPerSegment);
final QueryableIndex index = segmentGenerator.generate(dataSegment, schemaInfo, Granularities.NONE, rowsPerSegment);

final Pair<QueryRunnerFactoryConglomerate, Closer> conglomerate = CalciteTests.createQueryRunnerFactoryConglomerate();
closer.register(conglomerate.rhs);
final QueryRunnerFactoryConglomerate conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(closer);

final SpecificSegmentsQuerySegmentWalker walker = new SpecificSegmentsQuerySegmentWalker(conglomerate.lhs).add(
final SpecificSegmentsQuerySegmentWalker walker = new SpecificSegmentsQuerySegmentWalker(conglomerate).add(
dataSegment,
index
);
closer.register(walker);

final SchemaPlus rootSchema =
CalciteTests.createMockRootSchema(conglomerate.lhs, walker, plannerConfig, AuthTestUtils.TEST_AUTHORIZER_MAPPER);
CalciteTests.createMockRootSchema(conglomerate, walker, plannerConfig, AuthTestUtils.TEST_AUTHORIZER_MAPPER);
plannerFactory = new PlannerFactory(
rootSchema,
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate.lhs),
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
CalciteTests.createOperatorTable(),
CalciteTests.createExprMacroTable(),
plannerConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.druid.benchmark.datagen.SegmentGenerator;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.io.Closer;
Expand All @@ -39,6 +38,7 @@
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.NoopEscalator;
Expand Down Expand Up @@ -110,9 +110,7 @@ public void setup()
log.info("Starting benchmark setup using tmpDir[%s], rows[%,d].", segmentGenerator.getCacheDir(), rowsPerSegment);

final QueryableIndex index = segmentGenerator.generate(dataSegment, schemaInfo, Granularities.NONE, rowsPerSegment);
final Pair<QueryRunnerFactoryConglomerate, Closer> conglomerateCloserPair = CalciteTests
.createQueryRunnerFactoryConglomerate();
final QueryRunnerFactoryConglomerate conglomerate = conglomerateCloserPair.lhs;
final QueryRunnerFactoryConglomerate conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(closer);
final PlannerConfig plannerConfig = new PlannerConfig();

this.walker = closer.register(new SpecificSegmentsQuerySegmentWalker(conglomerate).add(dataSegment, index));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.joda.time.Duration;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -198,6 +199,19 @@ public String getId()
return query.getId();
}

@Override
public Query<T> withSubQueryId(String subQueryId)
{
return new MaterializedViewQuery<>(query.withSubQueryId(subQueryId), optimizer);
}

@Nullable
@Override
public String getSubQueryId()
{
return query.getSubQueryId();
}

@Override
public MaterializedViewQuery withDataSource(DataSource dataSource)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.Druids;
Expand All @@ -45,6 +44,7 @@
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.sql.SqlLifecycle;
Expand Down Expand Up @@ -88,10 +88,8 @@ public class TDigestSketchSqlAggregatorTest extends CalciteTestBase
@BeforeClass
public static void setUpClass()
{
final Pair<QueryRunnerFactoryConglomerate, Closer> conglomerateCloserPair = CalciteTests
.createQueryRunnerFactoryConglomerate();
conglomerate = conglomerateCloserPair.lhs;
resourceCloser = conglomerateCloserPair.rhs;
resourceCloser = Closer.create();
conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(resourceCloser);
}

@AfterClass
Expand Down Expand Up @@ -193,7 +191,7 @@ public void testComputingSketchOnNumericValues() throws Exception
authenticationResult
).toList();
final List<String[]> expectedResults = ImmutableList.of(
new String[] {
new String[]{
"\"AAAAAT/wAAAAAAAAQBgAAAAAAABAaQAAAAAAAAAAAAY/8AAAAAAAAD/wAAAAAAAAP/AAAAAAAABAAAAAAAAAAD/wAAAAAAAAQAgAAAAAAAA/8AAAAAAAAEAQAAAAAAAAP/AAAAAAAABAFAAAAAAAAD/wAAAAAAAAQBgAAAAAAAA=\""
}
);
Expand Down Expand Up @@ -265,7 +263,7 @@ public void testComputingQuantileOnPreAggregatedSketch() throws Exception
authenticationResult
).toList();
final List<double[]> expectedResults = ImmutableList.of(
new double[] {
new double[]{
1.1,
2.9,
5.3,
Expand Down Expand Up @@ -319,7 +317,7 @@ public void testGeneratingSketchAndComputingQuantileOnFly() throws Exception
authenticationResult
).toList();
final List<double[]> expectedResults = ImmutableList.of(
new double[] {
new double[]{
1.0,
3.5,
6.0
Expand Down Expand Up @@ -390,7 +388,7 @@ public void testQuantileOnNumericValues() throws Exception
authenticationResult
).toList();
final List<double[]> expectedResults = ImmutableList.of(
new double[] {
new double[]{
1.0,
3.5,
6.0
Expand Down Expand Up @@ -457,7 +455,8 @@ public void testCompressionParamForTDigestQuantileAgg() throws Exception
),
new TDigestSketchAggregatorFactory("a2:agg", "m1",
300
)))
)
))
.postAggregators(
new TDigestSketchToQuantilePostAggregator("a0", makeFieldAccessPostAgg("a0:agg"), 0.0f),
new TDigestSketchToQuantilePostAggregator("a1", makeFieldAccessPostAgg("a1:agg"), 0.5f),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.google.common.collect.Iterables;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.io.Closer;
Expand Down Expand Up @@ -59,6 +58,7 @@
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.sql.SqlLifecycle;
Expand Down Expand Up @@ -105,10 +105,8 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase
@BeforeClass
public static void setUpClass()
{
final Pair<QueryRunnerFactoryConglomerate, Closer> conglomerateCloserPair = CalciteTests
.createQueryRunnerFactoryConglomerate();
conglomerate = conglomerateCloserPair.lhs;
resourceCloser = conglomerateCloserPair.rhs;
resourceCloser = Closer.create();
conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(resourceCloser);
}

@AfterClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.google.common.collect.Iterables;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.io.Closer;
Expand Down Expand Up @@ -63,6 +62,7 @@
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.sql.SqlLifecycle;
Expand Down Expand Up @@ -105,10 +105,8 @@ public class DoublesSketchSqlAggregatorTest extends CalciteTestBase
@BeforeClass
public static void setUpClass()
{
final Pair<QueryRunnerFactoryConglomerate, Closer> conglomerateCloserPair = CalciteTests
.createQueryRunnerFactoryConglomerate();
conglomerate = conglomerateCloserPair.lhs;
resourceCloser = conglomerateCloserPair.rhs;
resourceCloser = Closer.create();
conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(resourceCloser);
}

@AfterClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.google.common.collect.Iterables;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.io.Closer;
Expand Down Expand Up @@ -56,6 +55,7 @@
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.sql.SqlLifecycle;
Expand Down Expand Up @@ -101,10 +101,8 @@ public class ThetaSketchSqlAggregatorTest extends CalciteTestBase
@BeforeClass
public static void setUpClass()
{
final Pair<QueryRunnerFactoryConglomerate, Closer> conglomerateCloserPair = CalciteTests
.createQueryRunnerFactoryConglomerate();
conglomerate = conglomerateCloserPair.lhs;
resourceCloser = conglomerateCloserPair.rhs;
resourceCloser = Closer.create();
conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(resourceCloser);
}

@AfterClass
Expand Down
Loading

0 comments on commit 91ea2e9

Please sign in to comment.