Skip to content
Permalink
Browse files
Allow adding calcite rules from extensions (#12715)
* Allow adding calcite rules from extensions

* fixup! Allow adding calcite rules from extensions

* Move Rules to CalciteRulesManager

* fixup! Move Rules to CalciteRulesManager
  • Loading branch information
rohangarg committed Jul 6, 2022
1 parent 49fefff commit d732de99486988a4311e1bf5621d4a6d5511b950
Showing 14 changed files with 155 additions and 33 deletions.
@@ -21,6 +21,7 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
@@ -39,6 +40,7 @@
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.expression.SqlOperatorConversion;
import org.apache.druid.sql.calcite.expression.builtin.QueryLookupOperatorConversion;
import org.apache.druid.sql.calcite.planner.CalciteRulesManager;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
import org.apache.druid.sql.calcite.planner.DruidPlanner;
@@ -431,7 +433,8 @@ public void setup()
plannerConfig,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
CalciteTests.getJsonMapper(),
CalciteTests.DRUID_SCHEMA_NAME
CalciteTests.DRUID_SCHEMA_NAME,
new CalciteRulesManager(ImmutableSet.of())
);
}

@@ -21,6 +21,7 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
@@ -37,6 +38,7 @@
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.sql.calcite.SqlVectorizedExpressionSanityTest;
import org.apache.druid.sql.calcite.planner.CalciteRulesManager;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.DruidPlanner;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
@@ -317,7 +319,8 @@ public void setup()
plannerConfig,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
CalciteTests.getJsonMapper(),
CalciteTests.DRUID_SCHEMA_NAME
CalciteTests.DRUID_SCHEMA_NAME,
new CalciteRulesManager(ImmutableSet.of())
);

try {
@@ -19,6 +19,7 @@

package org.apache.druid.benchmark.query;

import com.google.common.collect.ImmutableSet;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
@@ -38,6 +39,7 @@
import org.apache.druid.segment.generator.SegmentGenerator;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.sql.calcite.planner.CalciteRulesManager;
import org.apache.druid.sql.calcite.planner.DruidPlanner;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.PlannerFactory;
@@ -121,7 +123,8 @@ public void setup()
plannerConfig,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
CalciteTests.getJsonMapper(),
CalciteTests.DRUID_SCHEMA_NAME
CalciteTests.DRUID_SCHEMA_NAME,
new CalciteRulesManager(ImmutableSet.of())
);
groupByQuery = GroupByQuery
.builder()
@@ -22,9 +22,11 @@
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.multibindings.Multibinder;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.PolyBind;
import org.apache.druid.sql.calcite.rule.ExtensionCalciteRuleProvider;
import org.apache.druid.sql.calcite.run.NativeQueryMakerFactory;
import org.apache.druid.sql.calcite.run.QueryMakerFactory;

@@ -54,5 +56,6 @@ public void configure(Binder binder)

binder.bind(PlannerFactory.class).in(LazySingleton.class);
binder.bind(DruidOperatorTable.class).in(LazySingleton.class);
Multibinder.newSetBinder(binder, ExtensionCalciteRuleProvider.class);
}
}
@@ -20,6 +20,7 @@
package org.apache.druid.sql.calcite.planner;

import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import org.apache.calcite.interpreter.Bindables;
import org.apache.calcite.plan.RelOptLattice;
import org.apache.calcite.plan.RelOptMaterialization;
@@ -78,13 +79,15 @@
import org.apache.druid.sql.calcite.rule.DruidRelToDruidRule;
import org.apache.druid.sql.calcite.rule.DruidRules;
import org.apache.druid.sql.calcite.rule.DruidTableScanRule;
import org.apache.druid.sql.calcite.rule.ExtensionCalciteRuleProvider;
import org.apache.druid.sql.calcite.rule.FilterJoinExcludePushToChildRule;
import org.apache.druid.sql.calcite.rule.ProjectAggregatePruneUnusedCallRule;
import org.apache.druid.sql.calcite.rule.SortCollapseRule;

import java.util.List;
import java.util.Set;

public class Rules
public class CalciteRulesManager
{
public static final int DRUID_CONVENTION_RULES = 0;
public static final int BINDABLE_CONVENTION_RULES = 1;
@@ -95,7 +98,7 @@
// Calcite 1.23.0 fixes this issue by not consider expression as reduced if this case happens. However, while
// we are still using Calcite 1.21.0, a workaround is to limit the number of pattern matches to avoid infinite loop.
private static final String HEP_DEFAULT_MATCH_LIMIT_CONFIG_STRING = "druid.sql.planner.hepMatchLimit";
private static final int HEP_DEFAULT_MATCH_LIMIT = Integer.valueOf(
private final int HEP_DEFAULT_MATCH_LIMIT = Integer.valueOf(
System.getProperty(HEP_DEFAULT_MATCH_LIMIT_CONFIG_STRING, "1200")
);

@@ -107,7 +110,7 @@
// functions).
// 3) JoinCommuteRule (we don't support reordering joins yet).
// 4) JoinPushThroughJoinRule (we don't support reordering joins yet).
private static final List<RelOptRule> BASE_RULES =
private final List<RelOptRule> BASE_RULES =
ImmutableList.of(
AggregateStarTableRule.INSTANCE,
AggregateStarTableRule.INSTANCE2,
@@ -130,7 +133,7 @@
);

// Rules for scanning via Bindable, embedded directly in RelOptUtil's registerDefaultRules.
private static final List<RelOptRule> DEFAULT_BINDABLE_RULES =
private final List<RelOptRule> DEFAULT_BINDABLE_RULES =
ImmutableList.of(
Bindables.BINDABLE_TABLE_SCAN_RULE,
ProjectTableScanRule.INSTANCE,
@@ -142,7 +145,7 @@
// 1) ReduceExpressionsRule.JOIN_INSTANCE
// Removed by https://github.com/apache/druid/pull/9941 due to issue in https://github.com/apache/druid/issues/9942
// TODO: Re-enable when https://github.com/apache/druid/issues/9942 is fixed
private static final List<RelOptRule> REDUCTION_RULES =
private final List<RelOptRule> REDUCTION_RULES =
ImmutableList.of(
ReduceExpressionsRule.PROJECT_INSTANCE,
ReduceExpressionsRule.FILTER_INSTANCE,
@@ -158,7 +161,7 @@
// Omit DateRangeRules due to https://issues.apache.org/jira/browse/CALCITE-1601
// Omit UnionMergeRule since it isn't very effective given how Druid unions currently operate and is potentially
// expensive in terms of planning time.
private static final List<RelOptRule> ABSTRACT_RULES =
private final List<RelOptRule> ABSTRACT_RULES =
ImmutableList.of(
AggregateProjectPullUpConstantsRule.INSTANCE2,
UnionPullUpConstantsRule.INSTANCE,
@@ -186,7 +189,7 @@
// 4) FilterJoinRule.FILTER_ON_JOIN and FilterJoinRule.JOIN
// Removed by https://github.com/apache/druid/pull/9773 due to issue in https://github.com/apache/druid/issues/9843
// TODO: Re-enable when https://github.com/apache/druid/issues/9843 is fixed
private static final List<RelOptRule> ABSTRACT_RELATIONAL_RULES =
private final List<RelOptRule> ABSTRACT_RELATIONAL_RULES =
ImmutableList.of(
AbstractConverter.ExpandConversionRule.INSTANCE,
AggregateRemoveRule.INSTANCE,
@@ -198,15 +201,21 @@
SortRemoveRule.INSTANCE
);

private Rules()
private final Set<ExtensionCalciteRuleProvider> extensionCalciteRuleProviderSet;

/**
* Manages the rules for planning of SQL queries via Calcite. Also provides methods for extensions to provide custom
* rules for planning.
* @param extensionCalciteRuleProviderSet the set of custom rules coming from extensions
*/
@Inject
public CalciteRulesManager(final Set<ExtensionCalciteRuleProvider> extensionCalciteRuleProviderSet)
{
// No instantiation.
this.extensionCalciteRuleProviderSet = extensionCalciteRuleProviderSet;
}

public static List<Program> programs(final PlannerContext plannerContext)
public List<Program> programs(final PlannerContext plannerContext)
{


// Program that pre-processes the tree before letting the full-on VolcanoPlanner loose.
final Program preProgram =
Programs.sequence(
@@ -221,10 +230,12 @@ public static List<Program> programs(final PlannerContext plannerContext)
);
}

private static Program buildHepProgram(Iterable<? extends RelOptRule> rules,
boolean noDag,
RelMetadataProvider metadataProvider,
int matchLimit)
public Program buildHepProgram(
final Iterable<? extends RelOptRule> rules,
final boolean noDag,
final RelMetadataProvider metadataProvider,
final int matchLimit
)
{
final HepProgramBuilder builder = HepProgram.builder();
builder.addMatchLimit(matchLimit);
@@ -234,7 +245,7 @@ private static Program buildHepProgram(Iterable<? extends RelOptRule> rules,
return Programs.of(builder.build(), noDag, metadataProvider);
}

private static List<RelOptRule> druidConventionRuleSet(final PlannerContext plannerContext)
public List<RelOptRule> druidConventionRuleSet(final PlannerContext plannerContext)
{
final ImmutableList.Builder<RelOptRule> retVal = ImmutableList
.<RelOptRule>builder()
@@ -245,10 +256,13 @@ private static List<RelOptRule> druidConventionRuleSet(final PlannerContext plan
.add(new ExternalTableScanRule(plannerContext))
.addAll(DruidRules.rules(plannerContext));

for (ExtensionCalciteRuleProvider extensionCalciteRuleProvider : extensionCalciteRuleProviderSet) {
retVal.add(extensionCalciteRuleProvider.getRule(plannerContext));
}
return retVal.build();
}

private static List<RelOptRule> bindableConventionRuleSet(final PlannerContext plannerContext)
public List<RelOptRule> bindableConventionRuleSet(final PlannerContext plannerContext)
{
return ImmutableList.<RelOptRule>builder()
.addAll(baseRuleSet(plannerContext))
@@ -258,7 +272,7 @@ private static List<RelOptRule> bindableConventionRuleSet(final PlannerContext p
.build();
}

private static List<RelOptRule> baseRuleSet(final PlannerContext plannerContext)
public List<RelOptRule> baseRuleSet(final PlannerContext plannerContext)
{
final PlannerConfig plannerConfig = plannerContext.getPlannerConfig();
final ImmutableList.Builder<RelOptRule> rules = ImmutableList.builder();
@@ -315,7 +315,7 @@ private PlannerResult planWithDruidConvention(

RelNode parameterized = rewriteRelDynamicParameters(possiblyLimitedRoot.rel);
final DruidRel<?> druidRel = (DruidRel<?>) planner.transform(
Rules.DRUID_CONVENTION_RULES,
CalciteRulesManager.DRUID_CONVENTION_RULES,
planner.getEmptyTraitSet()
.replace(DruidConvention.instance())
.plus(root.collation),
@@ -362,7 +362,7 @@ private PlannerResult planWithBindableConvention(
) throws RelConversionException
{
BindableRel bindableRel = (BindableRel) planner.transform(
Rules.BINDABLE_CONVENTION_RULES,
CalciteRulesManager.BINDABLE_CONVENTION_RULES,
planner.getEmptyTraitSet().replace(BindableConvention.INSTANCE).plus(root.collation),
root.rel
);
@@ -72,6 +72,7 @@
private final ObjectMapper jsonMapper;
private final AuthorizerMapper authorizerMapper;
private final String druidSchemaName;
private final CalciteRulesManager calciteRuleManager;

@Inject
public PlannerFactory(
@@ -82,7 +83,8 @@ public PlannerFactory(
final PlannerConfig plannerConfig,
final AuthorizerMapper authorizerMapper,
final @Json ObjectMapper jsonMapper,
final @DruidSchemaName String druidSchemaName
final @DruidSchemaName String druidSchemaName,
final CalciteRulesManager calciteRuleManager
)
{
this.rootSchema = rootSchema;
@@ -93,6 +95,7 @@ public PlannerFactory(
this.authorizerMapper = authorizerMapper;
this.jsonMapper = jsonMapper;
this.druidSchemaName = druidSchemaName;
this.calciteRuleManager = calciteRuleManager;
}

/**
@@ -163,7 +166,7 @@ private FrameworkConfig buildFrameworkConfig(PlannerContext plannerContext)
.traitDefs(ConventionTraitDef.INSTANCE, RelCollationTraitDef.INSTANCE)
.convertletTable(new DruidConvertletTable(plannerContext))
.operatorTable(operatorTable)
.programs(Rules.programs(plannerContext))
.programs(calciteRuleManager.programs(plannerContext))
.executor(new DruidRexExecutor(plannerContext))
.typeSystem(DruidTypeSystem.INSTANCE)
.defaultSchema(rootSchema.getSubSchema(druidSchemaName))
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.sql.calcite.rule;

import org.apache.calcite.plan.RelOptRule;
import org.apache.druid.guice.annotations.UnstableApi;
import org.apache.druid.sql.calcite.planner.PlannerContext;

/**
* This interface provides a way to supply custom calcite planning rules from extensions. All the custom rules are
* collected and supplied to the planner which invokes {@link ExtensionCalciteRuleProvider#getRule(PlannerContext)}
* for each of the rule provider per query.
*/
@UnstableApi
public interface ExtensionCalciteRuleProvider
{
RelOptRule getRule(PlannerContext plannerContext);
}
@@ -67,6 +67,7 @@
import org.apache.druid.server.security.AuthenticatorMapper;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.Escalator;
import org.apache.druid.sql.calcite.planner.CalciteRulesManager;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
@@ -219,6 +220,7 @@ public void configure(Binder binder)
.in(LazySingleton.class);
binder.bind(QueryMakerFactory.class).to(NativeQueryMakerFactory.class);
binder.bind(new TypeLiteral<Supplier<DefaultQueryConfig>>(){}).toInstance(Suppliers.ofInstance(new DefaultQueryConfig(ImmutableMap.of())));
binder.bind(CalciteRulesManager.class).toInstance(new CalciteRulesManager(ImmutableSet.of()));
}
}
)
@@ -906,7 +908,8 @@ public int getMaxRowsPerFrame()
plannerConfig,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
CalciteTests.getJsonMapper(),
CalciteTests.DRUID_SCHEMA_NAME
CalciteTests.DRUID_SCHEMA_NAME,
new CalciteRulesManager(ImmutableSet.of())
)
),
smallFrameConfig,
@@ -996,7 +999,8 @@ public int getMinRowsPerFrame()
plannerConfig,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
CalciteTests.getJsonMapper(),
CalciteTests.DRUID_SCHEMA_NAME
CalciteTests.DRUID_SCHEMA_NAME,
new CalciteRulesManager(ImmutableSet.of())
)
),
smallFrameConfig,

0 comments on commit d732de9

Please sign in to comment.