Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pinot-query-planner module #8340

Merged

Conversation

walterddr
Copy link
Contributor

@walterddr walterddr commented Mar 11, 2022

Summary

initial commit for the multi-stage query planner.

Design Doc

https://docs.google.com/document/d/10-vL_bUrI-Pi2oYudWyUlQl9Kf0cLrW-Z8hGczkCPik/edit#heading=h.f7j5q82j0slb

TODO

  1. create a better performance serialization format for StagePlan
  2. Address type system and parser/validator TODOs to support all existing Pinot SQL.

@mcvsubbu
Copy link
Contributor

Please add a link to the design doc in your PRs, thanks.

@walterddr walterddr changed the title query planner Add pinot-query-planner module Mar 14, 2022
@codecov-commenter
Copy link

codecov-commenter commented Mar 16, 2022

Codecov Report

❗ No coverage uploaded for pull request base (multi_stage_query_engine@66de3ba). Click here to learn what that means.
The diff coverage is n/a.

@@                     Coverage Diff                     @@
##             multi_stage_query_engine    #8340   +/-   ##
===========================================================
  Coverage                            ?   30.47%           
===========================================================
  Files                               ?     1642           
  Lines                               ?    86111           
  Branches                            ?    12999           
===========================================================
  Hits                                ?    26246           
  Misses                              ?    57485           
  Partials                            ?     2380           
Flag Coverage Δ
integration1 28.63% <0.00%> (?)
integration2 27.20% <0.00%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.


Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 66de3ba...f8dab0c. Read the comment docs.



/**
* The {@code QueryEnvironment} contains the main entrypoint for query planning.
Copy link
Contributor

@siddharthteotia siddharthteotia Mar 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for completeness and readers to be aware, can you also add info into javadoc on the mapping between QueryEnvironment and a SQL query ? Is this created on a per query basis ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this actually is a good question on a 2nd thought. some of the components are actually not reusable. let me rethink this design

Copy link
Contributor

@siddharthteotia siddharthteotia Mar 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline with @walterddr - This is global but there is serialization needed at the Calcite planner level. So there is still a major TODO to make this scalable. Probably it is ok to instantiate the planner on each call (and eat that cost) to planQuery() and still keep QueryEnvironment global to avoid catalog instantiation per query

SqlNode validated = _validator.validate(parsed);
if (null == validated || !validated.getKind().belongsTo(SqlKind.QUERY)) {
throw new IllegalArgumentException(
String.format("unsupported SQL query, cannot validate out a valid sql from:\n%s", parsed));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we include the original SQL query as well ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

original query will be wrapped in upper-level planQuery

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically I wanted to include the original string SQL query in string. May be SqlNode.toString() takes care of that ?

Copy link
Contributor Author

@walterddr walterddr Mar 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. one will not directly call validate. when it goes throw planQuery it will print the original query string. b/c exception will be caught and rethrow with the sqlString attached to the message.

*
* <p>It provide the higher level entry interface to convert a SQL string into a {@link QueryPlan}.
*/
public class QueryEnvironment {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be called as PinotQueryPlanner or QueryPlanner since it is not just an environment holder and does the entire planning ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the reason why I called it environment is because it holds some stateful info during the planning and it is not stateless. but let me think more on the naming for this (and javadoc)

return this.createSqlType(SqlTypeName.VARCHAR);
case BYTES:
return this.createSqlType(SqlTypeName.VARBINARY);
case JSON:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JSON is a recognized type in Pinot so we should not throw Unsup for that ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JSON will also be implemented as Struct type i suppose.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO is fine for now I guess. We must handle JSON since it's a first class type in Pinot now

return builder.build();
}

private RelDataType toRelDataType(FieldSpec fieldSpec) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we handle/factor array / MV ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

calcite supports Map, Array and Struct type. but we are throwing here until operator support is added.

/**
* Extends Java-base TypeFactory from Calcite.
*/
public class TypeFactory extends JavaTypeFactoryImpl {
Copy link
Contributor

@siddharthteotia siddharthteotia Mar 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why we need to extend from JavaTypeFactoryImpl instead of RelDataTypeFactory

  • The interface is experimental and subject to change in future as per Calcite.
  • JavaTypeFactory is not experimental but the purpose of that interface seems to be to map a type / recordType to a java class ? Why do we need that model ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is for conveniency on implementing the JavaTypeFactory. but yes we can definitely create our own clean impl in future optimization

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be add a TODO here ?

public class Validator extends SqlValidatorImpl {

public Validator(SqlOperatorTable opTab, SqlValidatorCatalogReader catalogReader, RelDataTypeFactory typeFactory) {
super(opTab, catalogReader, typeFactory, Config.DEFAULT);
Copy link
Contributor

@siddharthteotia siddharthteotia Mar 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current CalciteSqlParser code in Pinot uses SqlConformanceEnum.BABEL and IIUC it was done during migration from PQL to SQL to relax few things on syntax and semantics.

Should we use BABEL here as well instead of DEFAULT ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given PQL is deprecated. i dont think we should use BABEL solely because of this.

Copy link
Contributor

@siddharthteotia siddharthteotia Mar 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I meant to say was after we migrated to SQL, we use BABEL conformance in CalciteSqlParser code.

import org.apache.calcite.rel.RelDistribution;


public class MailboxSendNode extends AbstractStageNode {
Copy link
Contributor

@siddharthteotia siddharthteotia Mar 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the discussion thread in design doc, I think we should have an abstraction of ExchangeNode. ExchangeNode should encapsulate sender and receiver node.

Similarly, there should be an abstraction for sender and receiver themselves.

Something like following....

Exchange

  • BroadcastExchange
  • SingleMergeExchange
  • HashPartitionExchange

Sender

  • BroadcastSender
  • SingleSender
  • HashPartitionSender

Receiver

  • OrderedReceiver
  • UnorderedReceiver

BroadcastExchange encapsulates

  • BroadcastSender
  • SomeReceiver

HashPartitionExchange encapsulates

  • HashPartitionSender
  • SomeReceiver

So ideally MailboxSend and MailboxReceive should be modeled as sender and receiver abstractions respectively as opposed to concrete implementations imo

Copy link
Contributor Author

@walterddr walterddr Mar 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I agree that we need to add more attributes to the stage nodes.
should we consider start simple and add attributes to the ExchangeNode?

to me the only thing we need to separate is SendExchangeNode and ReceiveExchangeNode. the items you mentioned above can be inferred by the Exchange.Type

  • e.g. a SendExchangeNode with Exchange.Type == BROADCAST result in a broadcastSender

benefit of having this is we can add more attributes to the ExchangeNode without exploding the combination of possible attributes. say later we want to have a HashPartitionButOrderedWithinPartitionSender


public abstract class AbstractStageNode implements StageNode {

protected final String _stageId;
Copy link
Contributor

@siddharthteotia siddharthteotia Mar 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

int (to reduce heap usage) ? or do we think this is arbitrary bytes and String is better ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline - we will consider this during serialization design

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adding TODO to the PR description

return new QueryPlan(_queryStageMap, _stageMetadataMap);
}

// non-threadsafe
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a thread safety related question on QueryEnvironment. If that class is instantiated per compiled query, then it implies calls to StagePlanner should be thread safe ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the non-threadsafe-ness comes from more on the calcite's planner (and RelNode). my interpretation for this is - there cannot be 2 queries in planning at the same time. but the planner can be reused.

@walterddr walterddr marked this pull request as ready for review March 20, 2022 14:58
- fix calcite upgrade compilation issue
- fix query compilation runtime after calcite 1.29 upgrade
- linter
@Override
public Expression getExpression(@Nullable SchemaPlus parentSchema, String name) {
requireNonNull(parentSchema, "parentSchema");
return Schemas.subSchemaExpression(parentSchema, name, getClass());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a flat namespace as of now so we don't support sub-schema and the calcite root schema is created with empty name so what is this code doing with sub-schema ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this code is a default implementation. in our case it is as good as returning null since we don't support it.


@Override
public RelProtoDataType getType(String name) {
return null;
Copy link
Contributor

@siddharthteotia siddharthteotia Mar 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@walterddr For this and all below functions, we should ideally throw UnsupOperationException instead of returning null or empty list as we probably can't predict from where and all calcite planning code will call them and if it does, better to fail the compilation through this exception

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


@Override
public Schema getSubSchema(String name) {
return null;
Copy link
Contributor

@siddharthteotia siddharthteotia Mar 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@walterddr It looks like Calcite doesn't expect this to be null ?

As per calcite docs, during query validation, calcite will call getSubSchema() on the registered root schema and then on the retrieved Schema, it will call getTable(schemaName) to get Table / PinotTable ?

Our root schema should be PinotCatalog but based on the above, I wonder how query validation is going to work when this function is invoked ?

On the other hand, if we create a dummy root schema with exactly one child / sub-schema as PinotCatalog, then this seems to work

  • dummyRootSchema.getSubSchema("Pinot")
  • returns instance of PinotCatalog
  • catalog.getTable(tableName)
  • returns corresponding PinotTable

Copy link
Contributor Author

@walterddr walterddr Mar 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is only true if we registered the user-facing Schema class and hoisted the contents up to CalciteSchema

for example, the user overrided schema class contains tables and user-defined functions, one can register those by extract all the tables into CalciteSchema.tableMap, functions into CalciteSchema.functionMap, etc.

This is not ideal for pinot because PinotCatalog is actually backed by TableCache, which is sort of a ever changing list of tables.
Therefore we use the SimpleCalciteSchema which doesn't go through the protected member variables inside CalciteSchema, instead directly falls through to the user-facing schema to acquire the data.
e.g. instead of getTable() { return tableMap.get(tableName); } it instead directly calls the Schema.getTable().

This way we dont have to create a calcite schema object everytime a new query comes in. one of the reason why we can have one query environment and reuse it on multiple queries.

obviously there's a drawback, if the schema/table changes in the middle of planning there potentially can be a race condition. but IMO we are better of in this case fail the query and retry since schema/table config change doens't happen so often --> the overhead to recreate an entire planner context takes more valuable E2E latency overhead.

Copy link
Contributor

@siddharthteotia siddharthteotia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@siddharthteotia siddharthteotia merged commit 5984af9 into apache:multi_stage_query_engine Mar 25, 2022
Jackie-Jiang pushed a commit that referenced this pull request Apr 6, 2022
* add pinot-query-planner

- fix calcite upgrade compilation issue
- fix query compilation runtime after calcite 1.29 upgrade
- linter

* address diff comments and add more TODOs

Co-authored-by: Rong Rong <rongr@startree.ai>
walterddr added a commit to walterddr/pinot that referenced this pull request May 6, 2022
* add pinot-query-planner

- fix calcite upgrade compilation issue
- fix query compilation runtime after calcite 1.29 upgrade
- linter

* address diff comments and add more TODOs

Co-authored-by: Rong Rong <rongr@startree.ai>
walterddr added a commit to walterddr/pinot that referenced this pull request May 17, 2022
* add pinot-query-planner

- fix calcite upgrade compilation issue
- fix query compilation runtime after calcite 1.29 upgrade
- linter

* address diff comments and add more TODOs

Co-authored-by: Rong Rong <rongr@startree.ai>
walterddr added a commit to walterddr/pinot that referenced this pull request May 24, 2022
* add pinot-query-planner

- fix calcite upgrade compilation issue
- fix query compilation runtime after calcite 1.29 upgrade
- linter

* address diff comments and add more TODOs

Co-authored-by: Rong Rong <rongr@startree.ai>
walterddr added a commit to walterddr/pinot that referenced this pull request Jun 8, 2022
* add pinot-query-planner

- fix calcite upgrade compilation issue
- fix query compilation runtime after calcite 1.29 upgrade
- linter

* address diff comments and add more TODOs

Co-authored-by: Rong Rong <rongr@startree.ai>
walterddr added a commit that referenced this pull request Jun 8, 2022
* add pinot-query-planner

- fix calcite upgrade compilation issue
- fix query compilation runtime after calcite 1.29 upgrade
- linter

* address diff comments and add more TODOs

Co-authored-by: Rong Rong <rongr@startree.ai>
@walterddr walterddr deleted the pr_query_planner branch December 6, 2023 16:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants