Skip to content

Commit

Permalink
[CARBONDATA-3922] Support order by limit push down for secondary inde…
Browse files Browse the repository at this point in the history
…x queries

Why is this PR needed?
a) Limit pushdown for SI is already supported. But when order by column is
not SI column, Still we were pushing down limit. Need to fix it.
b) when Limit is present and order by column and all the filter column is
SI column. we can pushdown order by + limit.
This can reduce SI output results and reduce the scan time in main table.
c) SI transformation rule is applied even though any relation don't contain SI

What changes were proposed in this PR?
a) Block limit push down if order by column is not an SI column
b) when Limit is present and order by column and all the filter column is
SI column, pushdown order by + limit
c) SI transformation rule need to apply only when any relation contains SI

This closes #3861
  • Loading branch information
ajantha-bhat authored and kunal642 committed Sep 11, 2020
1 parent b57d17b commit a59aec7
Show file tree
Hide file tree
Showing 3 changed files with 242 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ package org.apache.carbondata.spark.testsuite.secondaryindex

import scala.collection.JavaConverters._

import org.apache.commons.lang3.StringUtils

import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.spark.testsuite.secondaryindex.TestSecondaryIndexUtils
.isFilterPushedDownToSI;
import org.apache.carbondata.spark.testsuite.secondaryindex.TestSecondaryIndexUtils.isFilterPushedDownToSI
import org.apache.spark.sql.{CarbonEnv, Row}
import org.scalatest.BeforeAndAfterAll

Expand All @@ -29,7 +30,6 @@ import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentSta
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.spark.exception.ProcessMetaDataException

import org.apache.spark.sql.test.util.QueryTest

class TestSIWithSecondryIndex extends QueryTest with BeforeAndAfterAll {
Expand Down Expand Up @@ -319,6 +319,66 @@ class TestSIWithSecondryIndex extends QueryTest with BeforeAndAfterAll {
sql("drop table if exists maintable")
}

test("test SI order by limit push down") {
sql("drop table if exists table2")
sql("CREATE TABLE `table2` (`imsi` STRING, `carno` STRING, `longitude` STRING, `city` " +
"STRING, `starttime` BIGINT, `endtime` BIGINT) STORED AS carbondata TBLPROPERTIES" +
"('sort_scope'='global_sort','sort_columns'='starttime')")
sql("create index table2_index1 on table table2(carno, longitude, starttime) as 'carbondata'")
sql("create index table2_index2 on table table2(city) as 'carbondata'")
sql("insert into table2 select 'aa','ka14','ll','abc',23,24 ")
sql("insert into table2 select 'aa','ka14','ll','xyz',25,26 ")

// Allow order by and limit pushdown as all the filter and order by column is in SI
// a. For selected projections
var plan = sql(
"explain SELECT imsi FROM table2 WHERE CARNO = 'ka14' AND LONGITUDE is not null ORDER BY " +
"STARTTIME LIMIT 1")
.collect()(0)
.toString()
assert(StringUtils.countMatches(plan, "TakeOrderedAndProject") == 2)

// b. For all projections
plan = sql(
"explain SELECT * FROM table2 WHERE CARNO = 'ka14' AND LONGITUDE is not null ORDER BY " +
"STARTTIME LIMIT 1")
.collect()(0)
.toString()
assert(StringUtils.countMatches(plan, "TakeOrderedAndProject") == 2)

// Don't allow orderby and limit pushdown as order by column is not an SI column
plan = sql(
"explain SELECT * FROM table2 WHERE CARNO = 'ka14' AND LONGITUDE is not null ORDER BY " +
"endtime LIMIT 1")
.collect()(0)
.toString()
assert(StringUtils.countMatches(plan, "TakeOrderedAndProject") == 1)

// Don't allow orderby and limit pushdown as filter column is not an SI column
plan = sql(
"explain SELECT * FROM table2 WHERE imsi = 'aa' AND LONGITUDE is not null ORDER BY " +
"STARTTIME LIMIT 1")
.collect()(0)
.toString()
assert(StringUtils.countMatches(plan, "TakeOrderedAndProject") == 1)

// just NotEqual to should not be pushed down to SI without order by
plan = sql(
"explain SELECT * FROM table2 WHERE CARNO != 'ka14' ")
.collect()(0)
.toString()
assert(!plan.contains("table2_index1"))

// NotEqual to should not be pushed down to SI without order by in case of multiple tables also
plan = sql(
"explain SELECT * FROM table2 WHERE CARNO = 'ka14' and CITY != 'ddd' ")
.collect()(0)
.toString()
assert(!plan.contains("table2_index2") && plan.contains("table2_index1"))

sql("drop table table2")
}

override def afterAll {
sql("drop index si_altercolumn on table_WithSIAndAlter")
sql("drop table if exists table_WithSIAndAlter")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.spark.sql.secondaryindex.optimizer

import org.apache.log4j.Logger
import org.apache.spark.sql.{CarbonUtils, SparkSession}
import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonUtils, SparkSession}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, PredicateHelper}
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
Expand All @@ -28,6 +28,7 @@ import org.apache.spark.util.SparkUtil

import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.index.IndexType

/**
* Rule for rewriting plan if query has a filter on index table column
Expand All @@ -41,7 +42,17 @@ class CarbonSITransformationRule(sparkSession: SparkSession)
new CarbonSecondaryIndexOptimizer(sparkSession)

def apply(plan: LogicalPlan): LogicalPlan = {
if (checkIfRuleNeedToBeApplied(plan)) {
var hasSecondaryIndexTable = false
plan.collect {
case l: LogicalRelation if (!hasSecondaryIndexTable &&
l.relation.isInstanceOf[CarbonDatasourceHadoopRelation]) =>
hasSecondaryIndexTable = l.relation
.asInstanceOf[CarbonDatasourceHadoopRelation]
.carbonTable
.getIndexTableNames(IndexType.SI.getIndexProviderName).size() > 0

}
if (hasSecondaryIndexTable && checkIfRuleNeedToBeApplied(plan)) {
secondaryIndexOptimizer.transformFilterToJoin(plan, isProjectionNeeded(plan))
} else {
plan
Expand Down

0 comments on commit a59aec7

Please sign in to comment.