Skip to content

Comments

Decouple logical planning and native query generation in SQL planning#14232

Merged
cheddar merged 12 commits intoapache:masterfrom
rohangarg:planner
Jun 19, 2023
Merged

Decouple logical planning and native query generation in SQL planning#14232
cheddar merged 12 commits intoapache:masterfrom
rohangarg:planner

Conversation

@rohangarg
Copy link
Member

This patch starts the process of decoupling logical planning and native query generation in SQL planning. Currently, the DAG optimization and the native query generation are both done simultaneously in the planner via calcite optimization and conversion rules.
That coupling leads to difficulty in obtaining a logical DAG from the planner if needed, as well as the debugging and logging for erroneous queries also becomes very difficult to do or comprehend.
The patch decouples the two things by allowing logical planning to happen via the calcite planner and let it generate a DAG which contains RelNode nodes as per a defined logical Druid convention. Post that, the DAG is converted to a native query through a visitor over the produced RelNode tree.
The patch is put behind a feature flag plannerStrategy which can have two modes COUPLED and DECOUPLED. Currently, the default mode is COUPLED which means that the mode of planning currently is same as before.
The new planning mode is enabled via setting the mode as DECOUPLED.
Further, the feature is WIP in the sense that some operators (like join/window/union) haven't been rewritten to allow planning through the new mechanism. Also, in the currently converted operators (scan/filter/project/sort/aggregate), there are a few test failures in the corner cases. Both of those things are planned to be fixed in future patches for the feature.

@rohangarg rohangarg requested a review from cheddar May 9, 2023 06:26
}

@Override
public boolean equals(Object obj)

Check notice

Code scanning / CodeQL

Confusing overloading of methods

Method DruidVolcanoCost.equals(..) could be confused with overloaded method [equals](1), since dispatch depends on static types.
return queryList;
}

public List<DruidTable> getQueryTables()

Check notice

Code scanning / CodeQL

Exposing internal representation

getQueryTables exposes the internal representation stored in field queryTables. The value may be modified [after this call to getQueryTables](1).
return partialDruidQuery;
}

public List<PartialDruidQuery> getQueryList()

Check notice

Code scanning / CodeQL

Exposing internal representation

getQueryList exposes the internal representation stored in field queryList. The value may be modified [after this call to getQueryList](1).
@Override
public RelNode visit(RelNode other)
{
if (other instanceof TableScan) {

Check notice

Code scanning / CodeQL

Chain of 'instanceof' tests

This if block performs a chain of 6 type tests - consider alternatives, e.g. polymorphism or the visitor pattern.
/**
* {@link DruidLogicalNode} convention node for {@link Aggregate} plan node.
*/
public class DruidAggregate extends Aggregate implements DruidLogicalNode

Check failure

Code scanning / CodeQL

No clone method

No clone method, yet implements Cloneable.
/**
* {@link DruidLogicalNode} convention node for {@link TableScan} plan node.
*/
public class DruidTableScan extends TableScan implements DruidLogicalNode

Check failure

Code scanning / CodeQL

No clone method

No clone method, yet implements Cloneable.
@abhishekagarwal87
Copy link
Contributor

Hi Rohan - can you write a separate GH proposal that adds the motivation behind these changes with a bit more details? Seems like this is the first PR towards that end goal, you have in mind. The proposal can also include a short overview of the upcoming changes.

@clintropolis
Copy link
Member

+1 for design proposal

@imply-cheddar
Copy link
Contributor

imply-cheddar commented Jun 19, 2023

There are requests for a design doc. The description of this PR attempts to lay out the explanation (though it's very terse and might be hard to understand if you aren't familiar with the problems and work that has been done).

The basic part of this PR is that it is

  1. Introducing a new way of planning SQL. This is introduced as yet-another method of planning and doesn't impact the current methods of planning
  2. You can opt-in to the new method of planning via setting a context parameter (it's also very verbose right now in terms of logging and cannot handle all of the queries yet)

The new method "decouples" planning of native queries into a 2-phase process

  1. SQL -> DAG
  2. DAG -> native query

The DAG is returned from the SQL planner and then that DAG is used to build the native query.

The reason that we are doing this is to make it easier to iterate and work with SQL for Druid. Druid's native query planning follows a relatively uncommon pattern of building up the physical execution plan as the output of the volcano planner. On the plus side, this was done so that we could leverage the volcano planner to explore different methods of physical planning in case something wasn't natively plannable. On the negative side, this makes it incredibly difficult to actually work with the SQL planning as it exists in Druid. This is seen through the following symptoms:

  1. There is a class of errors that it is nigh-impossible to generate good error messages for. This is because an error could just mean "volcano planner, explore something else" or it could mean "user person, you did something wrong, re-write your query". The fundamental source of these poor error codes is that we have coupled the physical execution conversion with the planning, but the current code tries to work around this by allowing you to set a possible error, which the planner will surface. This "possible error" basically means that we show users an error message which is the last error message that caused the volcano planner to stop, and might or might not actually be related to the real issue. In fact, a lot of the time, this error is not related to the actual issue at all. Instead of working around the root cause (the coupling of the planning with the building of the execution), we should decouple things
  2. When a query fails to plan, but we think it should be plannable, it is extremely difficult to figure out if it is an issue with the rules (i.e. we need a rule that orders nodes differently) OR if it is an issue with the physical query planning (i.e. something in PartialDruidQuery and peripheral classes needs update). The "normal" debugging process for this would be, (1) is the SQL turning into a "good" DAG? If yes, figure out why that "good" DAG is not being converted, if no, figure out why the SQL is not turning into a "good" DAG. This debugging process is, in fact, what we normally follow, we just have to do it by setting a breakpoint in the volcano planner code and inspect all of the various different plans that it explores to figure out if there's a good one in there or not. Instead of needing to fire up a debugger to do the first-degree split, we should be able to do it much more directly. That is, the normal flow of the code could be SQL -> DAG -> native query, with an object representing each of these., which is exactly what the decoupling is accomplishing.
  3. It is very difficult to identify and understand what patterns of logical DAG are not runnable by native Druid queries. Given that the current code leverages "I cannot run that" as a method of telling the planner to explore other things, it's not abundantly clear which combinations of DAG elements lead to a failure that can be successfully planned by re-arranging things VS. which combinations of DAG elements lead to a failure that will never be converted. This makes it quite difficult to inventory the things that Druid's native queries can and cannot do when it comes to running SQL. The decoupling effort eliminates this because the first pass of SQL -> DAG should be able to happen successfully regardless of whether a Druid native query can be generated. When the generated DAG gets converted to a native query, the code that builds the native query will be able to understand and identify that it's not possible to convert that specific set of DAG elements and generate an error that explains why. So, what happens when we get an error that says we cannot plan a specific query, but we think that it should be plannable? We follow the normal debug procedure: we evaluate the generated DAG and convince ourselves that it is a "good" DAG given that input SQL, if it was a "bad" DAG, we figure out what rules will help us generate a "good" DAG, if it is a "good" DAG, we figure out how to change the conversion logic so that it stops generating an exception.

The change itself is not impacting the current behavior and is being done as an add-on. Once we have it passing all of the existing tests, then we will be able to switch the default planning mode over to it and deprecate the old native query planning. Given that this isn't actually changing behaviors and that it's not fundamentally changing any sort of public API, I believe it should be safe to iterate on this work quite rapidly.

@cheddar
Copy link
Contributor

cheddar commented Jun 19, 2023

I validated that this won't impact the current code paths, it requires a specific context parameter to be set to opt-in and will not impact any current behaviors. As such, I'm going to go ahead and merge this so that we can get it in and keep iterating on making more tests pass.

@cheddar cheddar merged commit 09d6c5a into apache:master Jun 19, 2023
@abhishekagarwal87
Copy link
Contributor

Those problems are indeed relatable. I am wondering if the new design will help with some other problems that you didn't intend to solve immediately. The one thing I had in mind was plan caching. The plan generation can be costly and sometimes forces us to switch to native queries in favor of high QPS. I am wondering if that will be easier to do in this new de-coupled mode where druid conversion happens after the DAG is generated.

@abhishekagarwal87
Copy link
Contributor

@rohangarg - also what does that intermediate DAG look like? can you post an example?

@rohangarg
Copy link
Member Author

@abhishekagarwal87 : An example of the intermediate DAG looks like :

DruidSort(sort0=[$2], sort1=[$0], dir0=[DESC], dir1=[ASC], druid=[logical]): rowcount = 100.0, cumulative cost = {201.0 rows, 1067.0 cpu, 0.0 io}, id = 136
  DruidProject(dim1=[$0], EXPR$1=[SUBSTRING($0, 2)], EXPR$2=[CHAR_LENGTH($0)], druid=[logical]): rowcount = 100.0, cumulative cost = {101.0 rows, 67.0 cpu, 0.0 io}, id = 135
    DruidAggregate(group=[{1}], druid=[logical]): rowcount = 100.0, cumulative cost = {101.0 rows, 17.0 cpu, 0.0 io}, id = 134
      DruidTableScan(table=[[druid, foo]], druid=[logical]): rowcount = 1000.0, cumulative cost = {tiny}, id = 130

This is for the sql query :

SELECT
  dim1, SUBSTRING(dim1, 2)
FROM druid.foo
GROUP BY dim1
ORDER BY CHARACTER_LENGTH(dim1) DESC, dim1

@abhishekagarwal87 abhishekagarwal87 added this to the 27.0 milestone Jul 19, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants