Skip to content

Commit

Permalink
[CARBONDATA-3407]Fix distinct, count, Sum query failure when MV is cr…
Browse files Browse the repository at this point in the history
…eated on single projection column

Problem:
when MV datamap is created on single column as simple projection, sum, distinct,count queries are failing during sql conversion of modular plan. Basically there is no case to handle the modular plan when we have group by node without alias info and has select child node which is rewritten.

Solution:
the sql generation cases should take this case also, after that the rewritten query will wrong as alias will be present inside count or aggregate function.
So actually rewritten query should be like:
SELECT count(limit_fail_dm1_table.limit_fail_designation) AS count(designation) FROM default.limit_fail_dm1_table

This closes #3249
  • Loading branch information
akashrn5 authored and ravipesala committed Jun 6, 2019
1 parent 85f1b9f commit b0d5a5c
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 3 deletions.
Expand Up @@ -79,7 +79,7 @@ class MVAnalyzerRule(sparkSession: SparkSession) extends Rule[LogicalPlan] {
DataMapClassProvider.MV.getShortName).asInstanceOf[SummaryDatasetCatalog]
if (needAnalysis && catalog != null && isValidPlan(plan, catalog)) {
val modularPlan = catalog.mvSession.sessionState.rewritePlan(plan).withMVTable
if (modularPlan.find (_.rewritten).isDefined) {
if (modularPlan.find(_.rewritten).isDefined) {
val compactSQL = modularPlan.asCompactSQL
val analyzed = sparkSession.sql(compactSQL).queryExecution.analyzed
analyzed
Expand Down
Expand Up @@ -1041,6 +1041,26 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
assert(verifyMVDataMap(analyzed2, "mvlikedm2"))
}

test("test distinct, count, sum on MV with single projection column") {
sql("drop table if exists maintable")
sql("create table maintable(name string, age int, add string) stored by 'carbondata'")
sql("create datamap single_mv using 'mv' as select age from maintable")
sql("insert into maintable select 'pheobe',31,'NY'")
sql("insert into maintable select 'rachel',32,'NY'")
val df1 = sql("select distinct(age) from maintable")
val df2 = sql("select sum(age) from maintable")
val df3 = sql("select count(age) from maintable")
val analyzed1 = df1.queryExecution.analyzed
val analyzed2 = df2.queryExecution.analyzed
val analyzed3 = df3.queryExecution.analyzed
checkAnswer(df1, Seq(Row(31), Row(32)))
checkAnswer(df2, Seq(Row(63)))
checkAnswer(df3, Seq(Row(2)))
assert(TestUtil.verifyMVDataMap(analyzed1, "single_mv"))
assert(TestUtil.verifyMVDataMap(analyzed2, "single_mv"))
assert(TestUtil.verifyMVDataMap(analyzed3, "single_mv"))
}

def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean = {
val tables = logicalPlan collect {
case l: LogicalRelation => l.catalogTable.get
Expand All @@ -1060,6 +1080,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
sql("drop table IF EXISTS fact_table_parquet")
sql("drop table if exists limit_fail")
sql("drop table IF EXISTS mv_like")
sql("drop table IF EXISTS maintable")
}

override def afterAll {
Expand Down
Expand Up @@ -161,7 +161,7 @@ trait SQLBuildDSL {
extractRewrittenOrNonRewrittenSelectGroupBySelect(s1, g, s2, alias)

case g@modular.GroupBy(_, _, _, _, s2@modular.Select(_, _, _, _, _, _, _, _, _, _), _, _, _)
if (g.alias.isEmpty && !s2.rewritten) =>
if g.alias.isEmpty =>
val fragmentList = s2.children.zipWithIndex
.map { case (child, index) => fragmentExtract(child, s2.aliasMap.get(index)) }
val fList = s2.joinEdges.map {
Expand Down
Expand Up @@ -124,7 +124,11 @@ class SQLBuilder private(
}
} else {
attrMap.get(ref) match {
case Some(alias) => Alias(alias.child, alias.name)(exprId = alias.exprId)
case Some(alias) =>
AttributeReference(
alias.child.asInstanceOf[AttributeReference].name,
ref.dataType)(exprId = ref.exprId,
alias.child.asInstanceOf[AttributeReference].qualifier)
case None => ref
}
}
Expand Down

0 comments on commit b0d5a5c

Please sign in to comment.