Skip to content

Commit

Permalink
Merge 05696bc into fa3e392
Browse files Browse the repository at this point in the history
  • Loading branch information
akashrn5 committed May 31, 2019
2 parents fa3e392 + 05696bc commit 339c401
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 3 deletions.
Expand Up @@ -79,7 +79,7 @@ class MVAnalyzerRule(sparkSession: SparkSession) extends Rule[LogicalPlan] {
DataMapClassProvider.MV.getShortName).asInstanceOf[SummaryDatasetCatalog]
if (needAnalysis && catalog != null && isValidPlan(plan, catalog)) {
val modularPlan = catalog.mvSession.sessionState.rewritePlan(plan).withMVTable
if (modularPlan.find (_.rewritten).isDefined) {
if (modularPlan.find(_.rewritten).isDefined) {
val compactSQL = modularPlan.asCompactSQL
val analyzed = sparkSession.sql(compactSQL).queryExecution.analyzed
analyzed
Expand Down
Expand Up @@ -1022,6 +1022,26 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
sql("drop table if exists all_table")
}

test("test distinct, count, sum on MV with single projection column") {
sql("drop table if exists maintable")
sql("create table maintable(name string, age int, add string) stored by 'carbondata'")
sql("create datamap single_mv using 'mv' as select age from maintable")
sql("insert into maintable select 'pheobe',31,'NY'")
sql("insert into maintable select 'rachel',32,'NY'")
val df1 = sql("select distinct(age) from maintable")
val df2 = sql("select sum(age) from maintable")
val df3 = sql("select count(age) from maintable")
val analyzed1 = df1.queryExecution.analyzed
val analyzed2 = df2.queryExecution.analyzed
val analyzed3 = df3.queryExecution.analyzed
checkAnswer(df1, Seq(Row(31), Row(32)))
checkAnswer(df2, Seq(Row(63)))
checkAnswer(df3, Seq(Row(2)))
assert(TestUtil.verifyMVDataMap(analyzed1, "single_mv"))
assert(TestUtil.verifyMVDataMap(analyzed2, "single_mv"))
assert(TestUtil.verifyMVDataMap(analyzed3, "single_mv"))
}

def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean = {
val tables = logicalPlan collect {
case l: LogicalRelation => l.catalogTable.get
Expand All @@ -1040,6 +1060,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
sql("drop table IF EXISTS fact_streaming_table2")
sql("drop table IF EXISTS fact_table_parquet")
sql("drop table if exists limit_fail")
sql("drop table IF EXISTS maintable")
}

override def afterAll {
Expand Down
Expand Up @@ -161,7 +161,7 @@ trait SQLBuildDSL {
extractRewrittenOrNonRewrittenSelectGroupBySelect(s1, g, s2, alias)

case g@modular.GroupBy(_, _, _, _, s2@modular.Select(_, _, _, _, _, _, _, _, _, _), _, _, _)
if (g.alias.isEmpty && !s2.rewritten) =>
if g.alias.isEmpty =>
val fragmentList = s2.children.zipWithIndex
.map { case (child, index) => fragmentExtract(child, s2.aliasMap.get(index)) }
val fList = s2.joinEdges.map {
Expand Down
Expand Up @@ -124,7 +124,11 @@ class SQLBuilder private(
}
} else {
attrMap.get(ref) match {
case Some(alias) => Alias(alias.child, alias.name)(exprId = alias.exprId)
case Some(alias) =>
AttributeReference(
alias.child.asInstanceOf[AttributeReference].name,
ref.dataType)(exprId = ref.exprId,
alias.child.asInstanceOf[AttributeReference].qualifier)
case None => ref
}
}
Expand Down

0 comments on commit 339c401

Please sign in to comment.