Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRILL-5691: enhance scalar sub queries checking for the cartesian join #889

Closed
wants to merge 1 commit into from

Conversation

weijietong
Copy link

No description provided.

@weijietong
Copy link
Author

The build process is good at my laptop. anyone please work this out.

@weijietong weijietong changed the title Drill 5691 enhance scalar sub queries checking for the cartesian join Drill-5691: enhance scalar sub queries checking for the cartesian join Jul 31, 2017
@weijietong weijietong changed the title Drill-5691: enhance scalar sub queries checking for the cartesian join DRILL-5691: enhance scalar sub queries checking for the cartesian join Jul 31, 2017
@arina-ielchiieva
Copy link
Member

@weijietong thanks for the PR.

  1. Is it possible to add unit tests?
  2. Can you please add in method description that a table scan with only one row should also be considered as a scalar as well.
  3. Please fix indention in if statements and change currentrel to camel case [1].

[1] https://drill.apache.org/docs/apache-drill-contribution-guidelines/

@weijietong
Copy link
Author

@arina-ielchiieva I have corrected the codes as you guide. But sorry for the unit tests, I have tried a long time to simulate a scan with 1 row ,but failed to do that. The row count of a scan is fetched from the AbstractGroupScan.getScanStats method. At my plugin ,I override this method to ensure it will return 1 .

boolean hasMoreInputs = false;
while (agg == null && currentRel != null) {
if (currentRel instanceof DrillAggregateRel) {
agg = (DrillAggregateRel)currentRel;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agg = (DrillAggregateRel) currentRel;

if (currentRel instanceof DrillAggregateRel) {
agg = (DrillAggregateRel)currentRel;
} else if (currentRel instanceof RelSubset) {
currentRel = ((RelSubset)currentRel).getBest() ;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currentRel = ((RelSubset) currentRel).getBest() ;

} else {
if(currentRel.getInputs().size()>1){
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add spaces: if (currentRel.getInputs().size() > 1) {

} else {
if(currentRel.getInputs().size()>1){
hasMoreInputs=true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add spaces: hasMoreInputs = true;

@@ -231,6 +236,12 @@ public static boolean isScalarSubquery(RelNode root) {
return true;
}
}
if(!hasMoreInputs && currentRel!=null){
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add spaces: if (!hasMoreInputs && currentRel != null) {

@@ -231,6 +236,12 @@ public static boolean isScalarSubquery(RelNode root) {
return true;
}
}
if(!hasMoreInputs && currentRel!=null){
double rowSize = RelMetadataQuery.instance().getMaxRowCount(currentRel);
if(rowSize==1){
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add spaces: if (rowSize == 1) {

@@ -325,4 +325,5 @@ public void testNlJoinWithLargeRightInputSuccess() throws Exception {
test(RESET_JOIN_OPTIMIZATION);
}
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert changes in this file.

@arina-ielchiieva
Copy link
Member

@weijietong regarding the unit test, I have tried to reproduce the problem and written the following unit test:

  @Test
  public void test() throws Exception {
    FileSystem fs = null;
    try {
      fs = FileSystem.get(new Configuration());

      // create table with partition pruning
      test("use dfs_test.tmp");
      String tableName = "table_with_pruning";
      Path dataFile = new Path(TestTools.getWorkingPath(),"src/test/resources/parquet/alltypes_required.parquet");
      test("create table %s partition by (col_int) as select * from dfs.`%s`", tableName, dataFile);

      // generate metadata
      test("refresh table metadata `%s`", tableName);
      
      // execute query
      String query = String.format("select count(distinct col_int), count(distinct col_chr) from `%s` where col_int = 45436", tableName);
      test(query);

    } finally {
      if (fs != null) {
        fs.close();
      }
    }
  }

AbstractGroupScan.getScanStats method returns one row but it does not fail. Can you please take a look?

@weijietong
Copy link
Author

@arina-ielchiieva your test case can not reproduce the error . You can search the dev email to find the origin error description with the keyword "Drill query planning error". Your query already satisfy the NestedLoopJoinPrule. My case is that I add another rule to change the Aggregate-->Aggregate-->Scan to Scan as the transformed Scan relnode already holding the count(distinct ) value. When this transformation occurs, the NestedLoopJoinPrule's checkPreconditions method will invoke JoinUtils.hasScalarSubqueryInput. Then it will fail, as the transformed relnode has no aggregate node which does not satisfy the current scalar rule.

I think it's hard to reproduce this error without a specific rule like what I do. the precondition is:

  1. a nested loop join
  2. no (aggregate--> aggregate) count distinct relation nodes in the plan
  3. the row number of one child of the nested loop join is 1 .

I wonder if the enhanced code does not break the current unit test ,it will be ok.

@arina-ielchiieva
Copy link
Member

arina-ielchiieva commented Aug 9, 2017

@weijietong thanks for code formatting.

I wonder if the enhanced code does not break the current unit test ,it will be ok.

Can you please check this? Did all unit tests pass after the change?

@weijietong
Copy link
Author

@arina-ielchiieva have passed all the unit tests under java-exec bundle , got some errors which are sure not associated with this change, maybe my branch was old from the master.

TestConstantFolding.testConstantFolding_allTypes:163 » org.apache.drill.commo...
TestCustomUserAuthenticator.positiveUserAuth » UserRemote SYSTEM ERROR: URISyn...
TestCustomUserAuthenticator.positiveUserAuthAfterNegativeUserAuth » UserRemote
TestInfoSchema.selectFromAllTables » UserRemote SYSTEM ERROR: URISyntaxExcepti...
TestViewSupport.infoSchemaWithView:355->BaseTestQuery.testRunAndReturn:331 » Rpc
TestInfoSchemaFilterPushDown.testFilterPushdown_NonEqual » UserRemote SYSTEM E...
TestParquetScan.testSuccessFile:64->BaseTestQuery.testRunAndReturn:331 » Rpc o...
TestTpchDistributedConcurrent.testConcurrentQueries:190 » test timed out afte...

I also found the error to do mvn test ,got the same error as JIRA DRILL-4104 . through mvn test -pl exec/java-exec this method,I got the unit test result. I wonder how the devs do the test result .

@arina-ielchiieva
Copy link
Member

arina-ielchiieva commented Aug 10, 2017

On mine environment all tests pass. I used to have troubles but long time ago. I had to fix issues with timezones, memory usage MAVEN_OPTS="-Xmx2048m -XX:MaxPermSize=1024m". Also I am pretty sure that some test have failed because the others did, usually the ones with Rpc exception. Maybe UserRemote SYSTEM ERROR: URISyntaxException is connected with your env. You should check that. Also please rebase on the latest master (you can do push -f again), also if TestTpchDistributedConcurrent.testConcurrentQueries will be failing again, try to disable it but make sure it passes separately. I usually run all test mvn clean install not by modules.

@weijietong
Copy link
Author

@arina-ielchiieva other tests have passed separately and time zone related test cases have been corrected in PR 5717

@arina-ielchiieva
Copy link
Member

Thanks @weijietong. LGTM.
@amansinha100 could you please do the final CR since you are familiar with this issue?

@@ -231,6 +236,12 @@ public static boolean isScalarSubquery(RelNode root) {
return true;
}
}
if (!hasMoreInputs && currentRel != null) {
double rowSize = RelMetadataQuery.instance().getMaxRowCount(currentRel);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @vvysotskyi has noted, getMaxRowCount returns Double and may return null if this method is not overriden in the operator [1]. I guess it would be safer to assign value to Double and use equals for comparison in this case (new Double(1.0).equals(rowSize)).

[1] https://github.com/apache/calcite/blob/ceca631872ec64fa345f7df9cdf185e1f9252449/core/src/main/java/org/apache/calcite/rel/metadata/RelMdMaxRowCount.java#L182

@weijietong
Copy link
Author

weijietong commented Aug 23, 2017

@arina-ielchiieva thanks for the advice, have corrected that.

@arina-ielchiieva
Copy link
Member

arina-ielchiieva commented Aug 23, 2017

+1, LGTM.

} else {
if (currentRel.getInputs().size() > 1) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems you are interested only in the 0 input case such as TableScan since later on line 239 you check for !hasMoreInputs. In that case, I don't think you need to set this flag. Why not just check currentRel.getInputs().size() == 0 on line 239 ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amansinha100 thanks for the review . this flag is to indicate that it's a single input relnode, not a join node.

@@ -231,6 +236,12 @@ public static boolean isScalarSubquery(RelNode root) {
return true;
}
}
if (!hasMoreInputs && currentRel != null) {
Double rowSize = RelMetadataQuery.instance().getMaxRowCount(currentRel);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I am thinking this entire method isScalarSubquery() should be rewritten to call RelMetadataQuery.getMaxRowCount(). Originally, when this was written, the max row count interface was not available.
I think your use case is somewhat specific that the table has 1 row. However, suppose we had LIMIT 1 in the subquery, we could still treat it as scalar since it would produce at most 1 row. My thought is instead of solving for your specific table scan case, we might as well address the generalized scalar check since Calcite already provides it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we should rewrite the method by call RelMetadataQuery.getMaxRowCount(). btw, could you give a guide how calcite do that ?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could see the usage of RelMdMaxRowCount in Calcite [1]. It has implementations for different Rels including Aggregate, sort+limit. I would recommend adding some unit tests though if you are going to make the code changes since this would be new functionality.

[1] https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/rel/metadata/RelMdMaxRowCount.java

@weijietong
Copy link
Author

@arina-ielchiieva @amansinha100 isScalarSubquery method has been refactored , please review.

if (currentRel instanceof RelSubset) {
currentRel = ((RelSubset) currentRel).getBest();
} else if (currentRel != null) {
Double rowCount = relMetadataQuery.getRowCount(currentRel);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getRowCount() is not correct. Pls see my prior comment on using the RelMdMaxRowCount.getMaxRowCount() APIs. The reason is getRowCount() will give an estimate which may not match the actual run-time value, whereas the getMaxRowCount() is an assertion by the optimizer that row count cannot exceed a number N (in your case 1).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amansinha100 thanks for the review ,have updated

return true;
RelMetadataQuery relMetadataQuery = RelMetadataQuery.instance();
RelNode currentRel = root;
for (; ; ) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the currentRel happens to be null we should not enter the loop. Better to keep the while(currentRel != null) and have the return FALSE at the end.

@paul-rogers
Copy link
Contributor

@arina-ielchiieva, looks like Weijie added a couple of commits since your +1. Can you take a look at them?

@weijietong
Copy link
Author

@arina-ielchiieva @amansinha100 any further advice ?

@weijietong weijietong force-pushed the drill-5691 branch 2 times, most recently from 7077a7d to 4f9d39d Compare September 15, 2017 15:25

@Override
public Double getMaxRowCount(TableScan rel, RelMetadataQuery mq) {
return rel.getRows();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I had been meaning to reply sooner. I think overloading the getMaxRowCount() to return rel.getRows() can create potential issues...because getMaxRowCount() should always return whatever is the maximum row count possible for that RelNode. Here, if you return TableScan.getRows(), the value is an estimate, which means in reality it could be higher. The caller might make incorrect decision based on this value.

I am thinking about your original motivation for the changes. Are you materializing the results into a single-row table ? It sounds like you want a special table scan whose max row count is 1. Is materializing the only option ? (the reason I am curious is it is odd to materialize very small data sets such as 1 row).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amansinha100 Your assumption is right. My initial motivation is to materialize the count distinct value of one table column without the group by clause into a single-row table i.e. a cache. To accelerate users' query time, this materialization seems reasonable.

Since getMaxRowCount() only invoked by the JoinUtils.isScalarSubquery ,there's no potential issues.

if (agg != null) {
if (agg.getGroupSet().isEmpty()) {
Double rowCount = relMetadataQuery.getMaxRowCount(currentRel);
if (rowCount != null && rowCount.equals(1.0)) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For scalar, the value could be <= 1.0 (0 also qualifies).

@paul-rogers
Copy link
Contributor

@arina-ielchiieva, @amansinha100 can you take another look at this one and see if your concerns have been addressed?

@weijietong
Copy link
Author

@amansinha100 I have removed the override of getMaxRowCount method of TableScan type to avoid your worry about unexpected results . This PR's target adjusts to just enhance the current cartesian join by leveraging Calcite's RelMdMaxRowCount method. I will solve our case specially at our env.

@weijietong
Copy link
Author

ping @arina-ielchiieva @amansinha100

@amansinha100
Copy link

@weijietong I ran the functional tests with the PR and saw some failures. While debugging those failures, I found the simple scalar aggregate case works but anything with compound aggregate expressions throws a CannotPlanException, for example:

explain plan for select count(*) from cp.tpch/lineitem.parquet l where l_quantity < (select max(l2.l_quantity) + 1 from cp.tpch/lineitem.parquet l2)

Here, the right side of the inequality join is actually a scalar but the JoinUtils.isScalar() method does not detect it correctly. The reason is the right side is a DrillProjectRel whose input is a RelSubSet. The RelSubSet has a DrillAggregateRel but it looks like currently Calcite does not detect this is a scalar aggregate if it occurs as part of a RelSubSet. I see Calcite has a comment in [1] which points to an open JIRA CALCITE-1048.

[1] https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/rel/metadata/RelMdMaxRowCount.java#L165

@amansinha100
Copy link

@weijietong I am interested in getting your basic changes in. It is unfortunate we are running into this issue with RelSubSet. Let me see if I can make some changes on top of your changes (I will keep your authorship) and have it pass all our functional regression tests.

@weijietong
Copy link
Author

@amansinha100 thanks for sharing the information. Got your point. I think your propose on CALCITE-1048 is possible. Since CALCITE-794 has completed at version 1.6 ,it seems there's a more perfect solution( to get the least max number of all the rels of the RelSubSet). But due to Drill's Caclite version is still based on 1.4 , I support your current temp solution. Only wonder that whether the explicitly searched RelNode's (such as DrillAggregateRel) maxRowCount can represent the best RelNode's maxRowCount ?

@weijietong weijietong closed this Oct 25, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants