Skip to content

Commit

Permalink
CONCAT flattening, filter decomposition. (#15634)
Browse files Browse the repository at this point in the history
* CONCAT flattening, filter decomposition.

Flattening: CONCAT(CONCAT(x, y), z) is flattened to CONCAT(x, y, z). This
is especially useful for the || operator, which is a binary operator and
leads to non-flat CONCAT calls.

Filter decomposition: transforms CONCAT(x, '-', y) = 'a-b' into
x = 'a' AND y = 'b'.

* One more test.

* Fix two tests.

* Adjustments from review.

* Fix empty string problem, add tests.
  • Loading branch information
gianm committed Jan 11, 2024
1 parent 2231cb3 commit 6c18434
Show file tree
Hide file tree
Showing 8 changed files with 1,066 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

public class ConcatOperatorConversion extends DirectOperatorConversion
{
private static final SqlFunction SQL_FUNCTION = OperatorConversions
public static final SqlFunction SQL_FUNCTION = OperatorConversions
.operatorBuilder("CONCAT")
.operandTypeChecker(OperandTypes.SAME_VARIADIC)
.returnTypeCascadeNullable(SqlTypeName.VARCHAR)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

public class TextcatOperatorConversion extends DirectOperatorConversion
{
private static final SqlFunction SQL_FUNCTION = OperatorConversions
public static final SqlFunction SQL_FUNCTION = OperatorConversions
.operatorBuilder("textcat")
.operandTypes(SqlTypeFamily.CHARACTER, SqlTypeFamily.CHARACTER)
.requiredOperandCount(2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.calcite.plan.hep.HepProgram;
import org.apache.calcite.plan.hep.HepProgramBuilder;
import org.apache.calcite.plan.volcano.AbstractConverter;
import org.apache.calcite.plan.volcano.VolcanoPlanner;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.RelFactories;
import org.apache.calcite.rel.metadata.DefaultRelMetadataProvider;
Expand All @@ -55,13 +56,16 @@
import org.apache.druid.sql.calcite.rule.DruidTableScanRule;
import org.apache.druid.sql.calcite.rule.ExtensionCalciteRuleProvider;
import org.apache.druid.sql.calcite.rule.FilterDecomposeCoalesceRule;
import org.apache.druid.sql.calcite.rule.FilterDecomposeConcatRule;
import org.apache.druid.sql.calcite.rule.FilterJoinExcludePushToChildRule;
import org.apache.druid.sql.calcite.rule.FlattenConcatRule;
import org.apache.druid.sql.calcite.rule.ProjectAggregatePruneUnusedCallRule;
import org.apache.druid.sql.calcite.rule.SortCollapseRule;
import org.apache.druid.sql.calcite.rule.logical.DruidLogicalRules;
import org.apache.druid.sql.calcite.run.EngineFeature;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;

Expand All @@ -88,7 +92,7 @@ public class CalciteRulesManager
* 3) {@link CoreRules#JOIN_COMMUTE}, {@link JoinPushThroughJoinRule#RIGHT}, {@link JoinPushThroughJoinRule#LEFT},
* and {@link CoreRules#FILTER_INTO_JOIN}, which are part of {@link #FANCY_JOIN_RULES}.
* 4) {@link CoreRules#PROJECT_FILTER_TRANSPOSE} because PartialDruidQuery would like to have the Project on top of the Filter -
* this rule could create a lot of non-usefull plans.
* this rule could create a lot of non-useful plans.
*/
private static final List<RelOptRule> BASE_RULES =
ImmutableList.of(
Expand Down Expand Up @@ -228,50 +232,87 @@ public CalciteRulesManager(final Set<ExtensionCalciteRuleProvider> extensionCalc
public List<Program> programs(final PlannerContext plannerContext)
{
final boolean isDebug = plannerContext.queryContext().isDebug();

// Program that pre-processes the tree before letting the full-on VolcanoPlanner loose.
final List<Program> prePrograms = new ArrayList<>();
prePrograms.add(new LoggingProgram("Start", isDebug));
prePrograms.add(Programs.subQuery(DefaultRelMetadataProvider.INSTANCE));
prePrograms.add(new LoggingProgram("Finished subquery program", isDebug));
prePrograms.add(DecorrelateAndTrimFieldsProgram.INSTANCE);
prePrograms.add(new LoggingProgram("Finished decorrelate and trim fields program", isDebug));
prePrograms.add(buildCoalesceProgram());
prePrograms.add(new LoggingProgram("Finished coalesce program", isDebug));
prePrograms.add(buildReductionProgram(plannerContext));
prePrograms.add(new LoggingProgram("Finished expression reduction program", isDebug));

final Program preProgram = Programs.sequence(prePrograms.toArray(new Program[0]));
final Program druidPreProgram = buildPreProgram(plannerContext, true);
final Program bindablePreProgram = buildPreProgram(plannerContext, false);

return ImmutableList.of(
Programs.sequence(
preProgram,
druidPreProgram,
Programs.ofRules(druidConventionRuleSet(plannerContext)),
new LoggingProgram("After Druid volcano planner program", isDebug)
),
Programs.sequence(
preProgram,
bindablePreProgram,
Programs.ofRules(bindableConventionRuleSet(plannerContext)),
new LoggingProgram("After bindable volcano planner program", isDebug)
),
Programs.sequence(
preProgram,
druidPreProgram,
Programs.ofRules(logicalConventionRuleSet(plannerContext)),
new LoggingProgram("After logical volcano planner program", isDebug)
)
);
}

private Program buildReductionProgram(final PlannerContext plannerContext)
/**
* Build the program that runs prior to the cost-based {@link VolcanoPlanner}.
*
* @param plannerContext planner context
* @param isDruid whether this is a Druid program
*/
private Program buildPreProgram(final PlannerContext plannerContext, final boolean isDruid)
{
final boolean isDebug = plannerContext.queryContext().isDebug();

// Program that pre-processes the tree before letting the full-on VolcanoPlanner loose.
final List<Program> prePrograms = new ArrayList<>();
prePrograms.add(new LoggingProgram("Start", isDebug));
prePrograms.add(Programs.subQuery(DefaultRelMetadataProvider.INSTANCE));
prePrograms.add(new LoggingProgram("Finished subquery program", isDebug));
prePrograms.add(DecorrelateAndTrimFieldsProgram.INSTANCE);
prePrograms.add(new LoggingProgram("Finished decorrelate and trim fields program", isDebug));
prePrograms.add(buildReductionProgram(plannerContext, isDruid));
prePrograms.add(new LoggingProgram("Finished expression reduction program", isDebug));

return Programs.sequence(prePrograms.toArray(new Program[0]));
}

/**
* Builds an expression reduction program using {@link #REDUCTION_RULES} (built-in to Calcite) plus some
* Druid-specific rules.
*/
private Program buildReductionProgram(final PlannerContext plannerContext, final boolean isDruid)
{
List<RelOptRule> hepRules = new ArrayList<RelOptRule>(REDUCTION_RULES);
final List<RelOptRule> hepRules = new ArrayList<>();

if (isDruid) {
// Must run before REDUCTION_RULES, since otherwise ReduceExpressionsRule#pushPredicateIntoCase may
// make it impossible to convert to COALESCE.
hepRules.add(new CaseToCoalesceRule());
hepRules.add(new CoalesceLookupRule());

// Flatten calls to CONCAT, which happen easily with the || operator since it only accepts two arguments.
hepRules.add(new FlattenConcatRule());

// Decompose filters on COALESCE to promote more usage of indexes.
hepRules.add(new FilterDecomposeCoalesceRule());
}

// Calcite's builtin reduction rules.
hepRules.addAll(REDUCTION_RULES);

if (isDruid) {
// Decompose filters on CONCAT to promote more usage of indexes. Runs after REDUCTION_RULES because
// this rule benefits from reduction of effectively-literal calls to actual literals.
hepRules.add(new FilterDecomposeConcatRule());
}

// Apply CoreRules#FILTER_INTO_JOIN early to avoid exploring less optimal plans.
if (plannerContext.getJoinAlgorithm().requiresSubquery()) {
if (isDruid && plannerContext.getJoinAlgorithm().requiresSubquery()) {
hepRules.add(CoreRules.FILTER_INTO_JOIN);
}
return buildHepProgram(
hepRules
);

return buildHepProgram(hepRules);
}

private static class LoggingProgram implements Program
Expand Down Expand Up @@ -372,7 +413,13 @@ public List<RelOptRule> baseRuleSet(final PlannerContext plannerContext)
return rules.build();
}

private static Program buildHepProgram(final Iterable<? extends RelOptRule> rules)
/**
* Build a {@link HepProgram} to apply rules mechanically as part of {@link #buildPreProgram}. Rules are applied
* one-by-one.
*
* @param rules rules to apply
*/
private static Program buildHepProgram(final Collection<RelOptRule> rules)
{
final HepProgramBuilder builder = HepProgram.builder();
builder.addMatchLimit(CalciteRulesManager.HEP_DEFAULT_MATCH_LIMIT);
Expand All @@ -382,20 +429,6 @@ private static Program buildHepProgram(final Iterable<? extends RelOptRule> rule
return Programs.of(builder.build(), true, DefaultRelMetadataProvider.INSTANCE);
}

/**
* Program that performs various manipulations related to COALESCE.
*/
private static Program buildCoalesceProgram()
{
return buildHepProgram(
ImmutableList.of(
new CaseToCoalesceRule(),
new CoalesceLookupRule(),
new FilterDecomposeCoalesceRule()
)
);
}

/**
* Based on Calcite's Programs.DecorrelateProgram and Programs.TrimFieldsProgram, which are private and only
* accessible through Programs.standard (which we don't want, since it also adds Enumerable rules).
Expand Down
Loading

0 comments on commit 6c18434

Please sign in to comment.