Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broker: Add ability to inline subqueries. #9533

Merged
merged 3 commits into from
Mar 18, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@
import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
import org.apache.druid.benchmark.datagen.SegmentGenerator;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.NoopEscalator;
Expand Down Expand Up @@ -183,20 +183,19 @@ public void setup()
log.info("Starting benchmark setup using cacheDir[%s], rows[%,d].", segmentGenerator.getCacheDir(), rowsPerSegment);
final QueryableIndex index = segmentGenerator.generate(dataSegment, schemaInfo, Granularities.NONE, rowsPerSegment);

final Pair<QueryRunnerFactoryConglomerate, Closer> conglomerate = CalciteTests.createQueryRunnerFactoryConglomerate();
closer.register(conglomerate.rhs);
final QueryRunnerFactoryConglomerate conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(closer);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


final SpecificSegmentsQuerySegmentWalker walker = new SpecificSegmentsQuerySegmentWalker(conglomerate.lhs).add(
final SpecificSegmentsQuerySegmentWalker walker = new SpecificSegmentsQuerySegmentWalker(conglomerate).add(
dataSegment,
index
);
closer.register(walker);

final SchemaPlus rootSchema =
CalciteTests.createMockRootSchema(conglomerate.lhs, walker, plannerConfig, AuthTestUtils.TEST_AUTHORIZER_MAPPER);
CalciteTests.createMockRootSchema(conglomerate, walker, plannerConfig, AuthTestUtils.TEST_AUTHORIZER_MAPPER);
plannerFactory = new PlannerFactory(
rootSchema,
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate.lhs),
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
CalciteTests.createOperatorTable(),
CalciteTests.createExprMacroTable(),
plannerConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.druid.benchmark.datagen.SegmentGenerator;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.io.Closer;
Expand All @@ -39,6 +38,7 @@
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.NoopEscalator;
Expand Down Expand Up @@ -110,9 +110,7 @@ public void setup()
log.info("Starting benchmark setup using tmpDir[%s], rows[%,d].", segmentGenerator.getCacheDir(), rowsPerSegment);

final QueryableIndex index = segmentGenerator.generate(dataSegment, schemaInfo, Granularities.NONE, rowsPerSegment);
final Pair<QueryRunnerFactoryConglomerate, Closer> conglomerateCloserPair = CalciteTests
.createQueryRunnerFactoryConglomerate();
final QueryRunnerFactoryConglomerate conglomerate = conglomerateCloserPair.lhs;
final QueryRunnerFactoryConglomerate conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(closer);
final PlannerConfig plannerConfig = new PlannerConfig();

this.walker = closer.register(new SpecificSegmentsQuerySegmentWalker(conglomerate).add(dataSegment, index));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.joda.time.Duration;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -198,6 +199,19 @@ public String getId()
return query.getId();
}

@Override
public Query<T> withSubQueryId(String subQueryId)
{
return new MaterializedViewQuery<>(query.withSubQueryId(subQueryId), optimizer);
}

@Nullable
@Override
public String getSubQueryId()
{
return query.getSubQueryId();
}

@Override
public MaterializedViewQuery withDataSource(DataSource dataSource)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.Druids;
Expand All @@ -45,6 +44,7 @@
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.sql.SqlLifecycle;
Expand Down Expand Up @@ -88,10 +88,8 @@ public class TDigestSketchSqlAggregatorTest extends CalciteTestBase
@BeforeClass
public static void setUpClass()
{
final Pair<QueryRunnerFactoryConglomerate, Closer> conglomerateCloserPair = CalciteTests
.createQueryRunnerFactoryConglomerate();
conglomerate = conglomerateCloserPair.lhs;
resourceCloser = conglomerateCloserPair.rhs;
resourceCloser = Closer.create();
conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(resourceCloser);
}

@AfterClass
Expand Down Expand Up @@ -193,7 +191,7 @@ public void testComputingSketchOnNumericValues() throws Exception
authenticationResult
).toList();
final List<String[]> expectedResults = ImmutableList.of(
new String[] {
new String[]{
"\"AAAAAT/wAAAAAAAAQBgAAAAAAABAaQAAAAAAAAAAAAY/8AAAAAAAAD/wAAAAAAAAP/AAAAAAAABAAAAAAAAAAD/wAAAAAAAAQAgAAAAAAAA/8AAAAAAAAEAQAAAAAAAAP/AAAAAAAABAFAAAAAAAAD/wAAAAAAAAQBgAAAAAAAA=\""
}
);
Expand Down Expand Up @@ -265,7 +263,7 @@ public void testComputingQuantileOnPreAggregatedSketch() throws Exception
authenticationResult
).toList();
final List<double[]> expectedResults = ImmutableList.of(
new double[] {
new double[]{
1.1,
2.9,
5.3,
Expand Down Expand Up @@ -319,7 +317,7 @@ public void testGeneratingSketchAndComputingQuantileOnFly() throws Exception
authenticationResult
).toList();
final List<double[]> expectedResults = ImmutableList.of(
new double[] {
new double[]{
1.0,
3.5,
6.0
Expand Down Expand Up @@ -390,7 +388,7 @@ public void testQuantileOnNumericValues() throws Exception
authenticationResult
).toList();
final List<double[]> expectedResults = ImmutableList.of(
new double[] {
new double[]{
1.0,
3.5,
6.0
Expand Down Expand Up @@ -457,7 +455,8 @@ public void testCompressionParamForTDigestQuantileAgg() throws Exception
),
new TDigestSketchAggregatorFactory("a2:agg", "m1",
300
)))
)
))
.postAggregators(
new TDigestSketchToQuantilePostAggregator("a0", makeFieldAccessPostAgg("a0:agg"), 0.0f),
new TDigestSketchToQuantilePostAggregator("a1", makeFieldAccessPostAgg("a1:agg"), 0.5f),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.google.common.collect.Iterables;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.io.Closer;
Expand Down Expand Up @@ -59,6 +58,7 @@
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.sql.SqlLifecycle;
Expand Down Expand Up @@ -105,10 +105,8 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase
@BeforeClass
public static void setUpClass()
{
final Pair<QueryRunnerFactoryConglomerate, Closer> conglomerateCloserPair = CalciteTests
.createQueryRunnerFactoryConglomerate();
conglomerate = conglomerateCloserPair.lhs;
resourceCloser = conglomerateCloserPair.rhs;
resourceCloser = Closer.create();
conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(resourceCloser);
}

@AfterClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.google.common.collect.Iterables;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.io.Closer;
Expand Down Expand Up @@ -63,6 +62,7 @@
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.sql.SqlLifecycle;
Expand Down Expand Up @@ -105,10 +105,8 @@ public class DoublesSketchSqlAggregatorTest extends CalciteTestBase
@BeforeClass
public static void setUpClass()
{
final Pair<QueryRunnerFactoryConglomerate, Closer> conglomerateCloserPair = CalciteTests
.createQueryRunnerFactoryConglomerate();
conglomerate = conglomerateCloserPair.lhs;
resourceCloser = conglomerateCloserPair.rhs;
resourceCloser = Closer.create();
conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(resourceCloser);
}

@AfterClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.google.common.collect.Iterables;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.io.Closer;
Expand Down Expand Up @@ -56,6 +55,7 @@
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.sql.SqlLifecycle;
Expand Down Expand Up @@ -101,10 +101,8 @@ public class ThetaSketchSqlAggregatorTest extends CalciteTestBase
@BeforeClass
public static void setUpClass()
{
final Pair<QueryRunnerFactoryConglomerate, Closer> conglomerateCloserPair = CalciteTests
.createQueryRunnerFactoryConglomerate();
conglomerate = conglomerateCloserPair.lhs;
resourceCloser = conglomerateCloserPair.rhs;
resourceCloser = Closer.create();
conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(resourceCloser);
}

@AfterClass
Expand Down
Loading