Skip to content

Commit

Permalink
SI order by push down
Browse files Browse the repository at this point in the history
  • Loading branch information
ajantha-bhat committed Aug 31, 2020
1 parent ed7e049 commit aabb5e1
Show file tree
Hide file tree
Showing 3 changed files with 225 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ package org.apache.carbondata.spark.testsuite.secondaryindex

import scala.collection.JavaConverters._

import org.apache.commons.lang3.StringUtils

import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.spark.testsuite.secondaryindex.TestSecondaryIndexUtils
.isFilterPushedDownToSI;
import org.apache.carbondata.spark.testsuite.secondaryindex.TestSecondaryIndexUtils.isFilterPushedDownToSI
import org.apache.spark.sql.{CarbonEnv, Row}
import org.scalatest.BeforeAndAfterAll

Expand All @@ -29,7 +30,6 @@ import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentSta
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.spark.exception.ProcessMetaDataException

import org.apache.spark.sql.test.util.QueryTest

class TestSIWithSecondryIndex extends QueryTest with BeforeAndAfterAll {
Expand Down Expand Up @@ -259,6 +259,66 @@ class TestSIWithSecondryIndex extends QueryTest with BeforeAndAfterAll {
sql("drop table if exists maintable")
}

test("test SI order by limit push down") {
sql("drop table if exists table2")
sql("CREATE TABLE `table2` (`imsi` STRING, `carno` STRING, `longitude` STRING, `city` " +
"STRING, `starttime` BIGINT, `endtime` BIGINT) STORED AS carbondata TBLPROPERTIES" +
"('sort_scope'='global_sort','sort_columns'='starttime')")
sql("create index table2_index1 on table table2(carno, longitude, starttime) as 'carbondata'")
sql("create index table2_index2 on table table2(city) as 'carbondata'")
sql("insert into table2 select 'aa','ka14','ll','abc',23,24 ")
sql("insert into table2 select 'aa','ka14','ll','xyz',25,26 ")

// Allow order by and limit pushdown as all the filter and order by column is in SI
// a. For selected projections
var plan = sql(
"explain SELECT imsi FROM table2 WHERE CARNO = 'ka14' AND LONGITUDE is not null ORDER BY " +
"STARTTIME LIMIT 1")
.collect()(0)
.toString()
assert(StringUtils.countMatches(plan, "TakeOrderedAndProject") == 2)

// b. For all projections
plan = sql(
"explain SELECT * FROM table2 WHERE CARNO = 'ka14' AND LONGITUDE is not null ORDER BY " +
"STARTTIME LIMIT 1")
.collect()(0)
.toString()
assert(StringUtils.countMatches(plan, "TakeOrderedAndProject") == 2)

// Don't allow orderby and limit pushdown as order by column is not an SI column
plan = sql(
"explain SELECT * FROM table2 WHERE CARNO = 'ka14' AND LONGITUDE is not null ORDER BY " +
"endtime LIMIT 1")
.collect()(0)
.toString()
assert(StringUtils.countMatches(plan, "TakeOrderedAndProject") == 1)

// Don't allow orderby and limit pushdown as filter column is not an SI column
plan = sql(
"explain SELECT * FROM table2 WHERE imsi = 'aa' AND LONGITUDE is not null ORDER BY " +
"STARTTIME LIMIT 1")
.collect()(0)
.toString()
assert(StringUtils.countMatches(plan, "TakeOrderedAndProject") == 1)

// just NotEqual to should not be pushed down to SI without order by
plan = sql(
"explain SELECT * FROM table2 WHERE CARNO != 'ka14' ")
.collect()(0)
.toString()
assert(!plan.contains("table2_index1"))

// NotEqual to should not be pushed down to SI without order by in case of multiple tables also
plan = sql(
"explain SELECT * FROM table2 WHERE CARNO = 'ka14' and CITY != 'ddd' ")
.collect()(0)
.toString()
assert(!plan.contains("table2_index2") && plan.contains("table2_index1"))

sql("drop table table2")
}

override def afterAll {
sql("drop index si_altercolumn on table_WithSIAndAlter")
sql("drop table if exists table_WithSIAndAlter")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.spark.sql.secondaryindex.optimizer

import org.apache.log4j.Logger
import org.apache.spark.sql.{CarbonUtils, SparkSession}
import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonUtils, SparkSession}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, PredicateHelper}
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
Expand All @@ -28,6 +28,7 @@ import org.apache.spark.util.SparkUtil

import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.index.IndexType

/**
* Rule for rewriting plan if query has a filter on index table column
Expand All @@ -41,7 +42,17 @@ class CarbonSITransformationRule(sparkSession: SparkSession)
new CarbonSecondaryIndexOptimizer(sparkSession)

def apply(plan: LogicalPlan): LogicalPlan = {
if (checkIfRuleNeedToBeApplied(plan)) {
var hasSecondaryIndexTable = false
plan.collect {
case l: LogicalRelation if (!hasSecondaryIndexTable &&
l.relation.isInstanceOf[CarbonDatasourceHadoopRelation]) =>
hasSecondaryIndexTable = l.relation
.asInstanceOf[CarbonDatasourceHadoopRelation]
.carbonTable
.getIndexTableNames(IndexType.SI.getIndexProviderName).size() > 0

}
if (hasSecondaryIndexTable && checkIfRuleNeedToBeApplied(plan)) {
secondaryIndexOptimizer.transformFilterToJoin(plan, isProjectionNeeded(plan))
} else {
plan
Expand Down

0 comments on commit aabb5e1

Please sign in to comment.