Skip to content

Commit

Permalink
Merge 3cd48b9 into 7541ef2
Browse files Browse the repository at this point in the history
  • Loading branch information
akashrn5 committed May 28, 2019
2 parents 7541ef2 + 3cd48b9 commit 28cda62
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 0 deletions.
Expand Up @@ -954,6 +954,23 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
sql("drop table if exists all_table")
}

test("test select * and distinct when MV is enabled") {
sql("drop table if exists limit_fail")
sql("CREATE TABLE limit_fail (empname String, designation String, doj Timestamp,workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int)STORED BY 'org.apache.carbondata.format'")
sql(s"LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE limit_fail OPTIONS" +
"('DELIMITER'= ',', 'QUOTECHAR'= '\"')")
sql("create datamap limit_fail_dm1 using 'mv' as select empname,designation from limit_fail")
try {
val df = sql("select distinct(empname) from limit_fail limit 10")
sql("select * from limit_fail limit 10").show()
val analyzed = df.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "limit_fail_dm1"))
} catch {
case ex: Exception =>
assert(false)
}
}

def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean = {
val tables = logicalPlan collect {
case l: LogicalRelation => l.catalogTable.get
Expand All @@ -971,6 +988,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
sql("drop table IF EXISTS fact_streaming_table1")
sql("drop table IF EXISTS fact_streaming_table2")
sql("drop table IF EXISTS fact_table_parquet")
sql("drop table if exists limit_fail")
}

override def afterAll {
Expand Down
Expand Up @@ -19,6 +19,7 @@ package org.apache.carbondata.mv.plans.modular

import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression, PredicateHelper, _}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.datasources.LogicalRelation

import org.apache.carbondata.mv.plans.{Pattern, _}
import org.apache.carbondata.mv.plans.modular.Flags._
Expand Down Expand Up @@ -118,6 +119,15 @@ abstract class ModularPatterns extends Modularizer[ModularPlan] {
makeSelectModule(output, input, predicate, aliasmap, joinedge, flags,
children.map(modularizeLater), Seq(Seq(limitExpr)) ++ fspec1, wspec)

// if select * is with limit, then projection is removed from plan, so send the parent plan
// to ExtractSelectModule to make the select node
case limit@Limit(limitExpr, lr: LogicalRelation) =>
val (output, input, predicate, aliasmap, joinedge, children, flags1,
fspec1, wspec) = ExtractSelectModule.unapply(limit).get
val flags = flags1.setFlag(LIMIT)
makeSelectModule(output, input, predicate, aliasmap, joinedge, flags,
children.map(modularizeLater), Seq(Seq(limitExpr)) ++ fspec1, wspec)

case Limit(
limitExpr,
ExtractSelectModule(output, input, predicate, aliasmap, joinedge, children, flags1,
Expand Down
Expand Up @@ -132,6 +132,13 @@ object ExtractSelectModule extends PredicateHelper {
s"\n right child ${ right }")
}

// when select * is executed with limit, ColumnPruning rule will remove the project node from
// the plan during optimization, so if child of Limit is relation, then make the select node
// and make the modular plan
case Limit(limitExpr, lr: LogicalRelation) =>
(lr.output, lr.output, Nil, Nil, Seq(lr), true, Map.empty, NoFlags, Seq.empty, Seq
.empty)

case other =>
(other.output, other.output, Nil, Nil, Seq(other), false, Map.empty, NoFlags, Seq.empty, Seq
.empty)
Expand Down

0 comments on commit 28cda62

Please sign in to comment.