Skip to content

Commit

Permalink
Fix distinct, count, Sum query failure when MV is created on single p…
Browse files Browse the repository at this point in the history
…rojection column
  • Loading branch information
akashrn5 committed May 31, 2019
1 parent fa3e392 commit 05696bc
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 3 deletions.
Expand Up @@ -79,7 +79,7 @@ class MVAnalyzerRule(sparkSession: SparkSession) extends Rule[LogicalPlan] {
DataMapClassProvider.MV.getShortName).asInstanceOf[SummaryDatasetCatalog]
if (needAnalysis && catalog != null && isValidPlan(plan, catalog)) {
val modularPlan = catalog.mvSession.sessionState.rewritePlan(plan).withMVTable
if (modularPlan.find (_.rewritten).isDefined) {
if (modularPlan.find(_.rewritten).isDefined) {
val compactSQL = modularPlan.asCompactSQL
val analyzed = sparkSession.sql(compactSQL).queryExecution.analyzed
analyzed
Expand Down
Expand Up @@ -1022,6 +1022,26 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
sql("drop table if exists all_table")
}

test("test distinct, count, sum on MV with single projection column") {
sql("drop table if exists maintable")
sql("create table maintable(name string, age int, add string) stored by 'carbondata'")
sql("create datamap single_mv using 'mv' as select age from maintable")
sql("insert into maintable select 'pheobe',31,'NY'")
sql("insert into maintable select 'rachel',32,'NY'")
val df1 = sql("select distinct(age) from maintable")
val df2 = sql("select sum(age) from maintable")
val df3 = sql("select count(age) from maintable")
val analyzed1 = df1.queryExecution.analyzed
val analyzed2 = df2.queryExecution.analyzed
val analyzed3 = df3.queryExecution.analyzed
checkAnswer(df1, Seq(Row(31), Row(32)))
checkAnswer(df2, Seq(Row(63)))
checkAnswer(df3, Seq(Row(2)))
assert(TestUtil.verifyMVDataMap(analyzed1, "single_mv"))
assert(TestUtil.verifyMVDataMap(analyzed2, "single_mv"))
assert(TestUtil.verifyMVDataMap(analyzed3, "single_mv"))
}

def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean = {
val tables = logicalPlan collect {
case l: LogicalRelation => l.catalogTable.get
Expand All @@ -1040,6 +1060,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
sql("drop table IF EXISTS fact_streaming_table2")
sql("drop table IF EXISTS fact_table_parquet")
sql("drop table if exists limit_fail")
sql("drop table IF EXISTS maintable")
}

override def afterAll {
Expand Down
Expand Up @@ -161,7 +161,7 @@ trait SQLBuildDSL {
extractRewrittenOrNonRewrittenSelectGroupBySelect(s1, g, s2, alias)

case g@modular.GroupBy(_, _, _, _, s2@modular.Select(_, _, _, _, _, _, _, _, _, _), _, _, _)
if (g.alias.isEmpty && !s2.rewritten) =>
if g.alias.isEmpty =>
val fragmentList = s2.children.zipWithIndex
.map { case (child, index) => fragmentExtract(child, s2.aliasMap.get(index)) }
val fList = s2.joinEdges.map {
Expand Down
Expand Up @@ -124,7 +124,11 @@ class SQLBuilder private(
}
} else {
attrMap.get(ref) match {
case Some(alias) => Alias(alias.child, alias.name)(exprId = alias.exprId)
case Some(alias) =>
AttributeReference(
alias.child.asInstanceOf[AttributeReference].name,
ref.dataType)(exprId = ref.exprId,
alias.child.asInstanceOf[AttributeReference].qualifier)
case None => ref
}
}
Expand Down

0 comments on commit 05696bc

Please sign in to comment.