Skip to content

SAMZA-2549 - Samza-sql: Add query optimizations for remote table joins#1384

Merged
atoomula merged 14 commits intoapache:masterfrom
atoomula:queryopt
Jul 20, 2020
Merged

SAMZA-2549 - Samza-sql: Add query optimizations for remote table joins#1384
atoomula merged 14 commits intoapache:masterfrom
atoomula:queryopt

Conversation

@atoomula
Copy link
Copy Markdown
Contributor

@atoomula atoomula commented Jun 14, 2020

Feature:
Samza Sql currently does not have a query optimizer. As a result, we rely on users to write their queries in such a way that the sql execution is optimized which is often clumsy and non-intuitive. Adding Query optimization will make sql query more intuitive at the same time making the plan optimized.

Changes
This change adds rule based optimizer (HepPlanner) to Samza Sql query planner. As part of this commit, FilterJoinRule is added for remote table joins.

Added remote table join tests to ensure the plan is optimized.

@atoomula atoomula requested a review from shanthoosh June 14, 2020 16:50
@mynameborat
Copy link
Copy Markdown
Contributor

Can you please update the PR description according to - https://cwiki.apache.org/confluence/display/SAMZA/SEP-25%3A+PR+Title+And+Description+Guidelines


private RelRoot optimize(RelRoot relRoot) {
RelTraitSet relTraitSet = RelTraitSet.createEmpty();
relTraitSet = relTraitSet.plus(EnumerableConvention.INSTANCE);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is basically tricking the planner to use Enum Convention plus Volcano planner to generate a Plan with Convention of EnumerableConvention.INSTANCE. This will lead to complex branches down the line when doing the conversion as a follow up stage. You don't think the best way to do this is by using Calcite Conventions where Rules and Translation is happening at the same time to avoid complex code and very cryptic comments on what to expect when doing the translation ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is vestige from my earlier attempt to enable volcano planner. I removed it now.

LOG.info("query plan:\n" + RelOptUtil.toString(relRoot.rel, SqlExplainLevel.ALL_ATTRIBUTES));
return relRoot;
LOG.info("query plan without optimization:\n"
+ RelOptUtil.toString(relRoot.rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When would you turn off the optimization ? if it is just optimization it should be always turned on ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm considering this as experimental right now considering this is the first version. Once we vet it thru' the real use-cases and gain confidence, we can enable it by default.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the main concern is that planning will fail with the new rules my suggestion is to have it on by default and catch the exception and re-plan without optimization. In this way we can learn the logs. It is up to you if you think this can be too much work

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not strongly against turning it on by default. I turned it on by default now.


//~ Methods ----------------------------------------------------------------

protected void perform(RelOptRuleCall call, Filter filter,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far I can tell it is only testing if filter is on the good side of the join, seems to me most of the work can be done at the onMatch, we can just extend and override onMatch ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I plan to add JoinConditionPushRule as well in next iteration (with no filter) which will reuse most of the code here and onMatch will be different.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion if we can minimize the copy and past that will be ideal and have the rest in a follow up, but it is up to you this is not a blocking point.

}

public RelRoot plan(String query) {
private Planner getPlanner() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The planner is a closable resource I think it would be better to use it within a try block or make sure to close it when done.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Added code to close it now.

JoinInputNode.getInputType(join.getRight(), systemStreamConfigBySource);

// Disable this optimnization for queries using local table.
if (inputTypeOnLeft == InputType.LOCAL_TABLE || inputTypeOnRight == InputType.LOCAL_TABLE) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we turning off for local tables things work fine, not sure I am getting this ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There could be more optimizations done for local table as local tables do not have the limitations that remote tables have. We could directly enable Calcite's FilterJoinRule for local tables.

Copy link
Copy Markdown
Contributor

@b-slim b-slim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good to me, got some minor comments:

  • Closing Planner.
  • If possible Plan with optimization on and re-plan on exception if the concern is planning issues.
  • If possible I highly recommend Avoid copying complex code if we can handle this within the onMatch call that will lead to better first cut.

Copy link
Copy Markdown
Contributor

@shanthoosh shanthoosh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes.

try {
RelRoot optimizedRelRoot =
RelRoot.of(getPlanner().transform(0, relTraitSet, relRoot.project()), SqlKind.SELECT);
LOG.info("query plan with optimization:\n"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

  1. By default, Logger adds \n to EOL. Unnecessary to explicitly add it.
  2. Also, it's better to capitalize all the log-messages.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. \n is not for EOL. It is for separating the log.
  2. You mean all log messages in Samza or just this ? And what is the point of capitalizing ? Is it to catch the eye while going thru logs ? If yes, we don't need to as their format makes them stand out in the logs,

2020-07-19 10:11:16.814 [main] [] QueryPlanner [INFO] query plan without optimization:
LogicalProject(key=[$9], pageKey=[$9], companyName=['N/A'], profileName=[$2], profileAddress=[$4])
LogicalFilter(condition=[AND(=($2, 'Mike'), =($10, 1))])
LogicalProject(key=[$0], id=[$1], name=[$2], companyId=[$3], address=[$4], selfEmployed=[$5], phoneNumbers=[$6], mapValues=[$7], __key__0=[$8], pageKey=[$9], profileId=[$10])
LogicalJoin(condition=[=($0, $11)], joinType=[inner])
LogicalTableScan(table=[[testRemoteStore, Profile, $table]])
LogicalProject(key=[$0], pageKey=[$1], profileId=[$2], $f3=[BuildOutputRecord('id', $2)])
LogicalTableScan(table=[[testavro, PAGEVIEW]])

2020-07-19 10:11:16.816 [main] [] QueryPlanner [INFO] query plan with optimization:
LogicalProject(key=[$9], pageKey=[$9], companyName=['N/A'], profileName=[$2], profileAddress=[$4])
LogicalFilter(condition=[=($2, 'Mike')])
LogicalJoin(condition=[=($0, $11)], joinType=[inner])
LogicalTableScan(table=[[testRemoteStore, Profile, $table]])
LogicalFilter(condition=[=($2, 1)])
LogicalProject(key=[$0], pageKey=[$1], profileId=[$2], $f3=[BuildOutputRecord('id', $2)])
LogicalTableScan(table=[[testavro, PAGEVIEW]])

QueryPlanner planner = getQueryPlanner(getSqlConfig(sqlStmts, config));
List<RelRoot> relRoots = new LinkedList<>();
for (String sql: sqlStmts) {
QueryPlanner planner = getQueryPlanner(getSqlConfig(Collections.singletonList(sql), config));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the rationale for recreating the planner for every sql-statement in the app?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. Calcite Planner as it stands today does not seem to be supported for reuse. Although the intent is there as they have exposed reset API. But it does not work. But it is such low cost to create new planner for each sql.

sqlOperatorTables.add(new SamzaSqlOperatorTable());
sqlOperatorTables.add(new SamzaSqlUdfOperatorTable(samzaSqlFunctions));

// TODO: Introduce a pluggable rule factory.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be better to create a follow-up ticket for this action-item.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

JoinInputNode.InputType inputTypeOnRight =
JoinInputNode.getInputType(join.getRight(), systemStreamConfigBySource);

// Disable this optimnization for queries using local table.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: %s/optimnization/optimization

filter != null
? RelOptUtil.conjunctions(filter.getCondition())
: new ArrayList<>();
final com.google.common.collect.ImmutableList<RexNode> origAboveFilters =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please import the ImmutableList class and don't hardcode the package paths. There're multiple occurrences in this file and else-where.

!joinType.generatesNullsOnLeft() && !donotOptimizeLeft,
!joinType.generatesNullsOnRight() && !donotOptimizeRight,
joinFilters,
leftFilters,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leftfilters and rightFilters are initialized and are not modified. It's very hard to find where they're populated. Please add a comment here that leftFilters and rightFilters will be populated by this classifyFilters method.

LOG.info("query plan:\n" + RelOptUtil.toString(relRoot.rel, SqlExplainLevel.ALL_ATTRIBUTES));
return relRoot;
LOG.info(
"query plan without optimization:\n" + RelOptUtil.toString(relRoot.rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please capitalize log messages and remove \n at the end(which gets added by default).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Responded in earlier comment.

String errorMsg =
"Error while optimizing query plan:\n" + RelOptUtil.toString(relRoot.rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES);
LOG.error(errorMsg, e);
planner.close();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling close() here seems unnecessary, It's already closed at the caller already with try-with-resources-closeable.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh yeah.. good point!

sqlOperatorTables.add(new SamzaSqlUdfOperatorTable(samzaSqlFunctions));

// TODO: Introduce a pluggable rule factory.
List<RelOptRule> rules = ImmutableList.of(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to determine if these rel-rules are applied on a rel-plan and emit a metric(or log it before/after the optimization) for debugging purposes.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK, only Hex/VolcanoPlanners make such decisions. Not sure if we can determine that in the rule itself.

* This class is customized form of Calcite's {@link org.apache.calcite.rel.rules.FilterJoinRule} for
* remote table joins.
*/
public abstract class SamzaSqlFilterRemoteJoinRule extends RelOptRule {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious. There seems to be considerable duplication with FilterJoinRule calcite native-class. Post CALCITE-3170, calcite supports anti-join on conditions push-down natively. If we upgrade to 1.21.0 rel-planner, then wouldn't overriding match suffice here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you talking about just anti-joins or in general about this rule ? If latter, Slim has a comment on this as well and I responded to him. I will have add another rule for join condition which reuses the same logic. Let me see at that time if I can inherit from Calcite rule and just override match.

@atoomula atoomula merged commit f665824 into apache:master Jul 20, 2020
lakshmi-manasa-g pushed a commit to lakshmi-manasa-g/samza that referenced this pull request Feb 9, 2021
apache#1384)

* SAMZA-2549 - Samza-sql: Add query optimizations for remote table joins

* SAMZA-2549 - Samza-sql: Add query optimizations for remote table joins

* SAMZA-2549 - Samza-sql: Add query optimizations for remote table joins

* SAMZA-2549 - Samza-sql: Add query optimizations for remote table joins

* SAMZA-2549 - Samza-sql: Add query optimizations for remote table joins

* SAMZA-2549 - Samza-sql: Add query optimizations for remote table joins

* SAMZA-2549 - Samza-sql: Add query optimizations for remote table joins

* SAMZA-2549 - Samza-sql: Add query optimizations for remote table joins

* SAMZA-2549 - Samza-sql: Add query optimizations for remote table joins

* SAMZA-2549 - Samza-sql: Add query optimizations for remote table joins

* Fix checkstyle errors

* Fix checkstyle errors

* Fix checkstyle errors

* SAMZA-2549 - Samza-sql: Add query optimizations for remote table joins
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants