Skip to content

Commit

Permalink
[CARBONDATA-3444]Fix MV query failure when column name and table name…
Browse files Browse the repository at this point in the history
… is same in case of join scenario

Problem:
when there are columns with same in different table, after sql generation, the project column will be like gen_subsumer_0.product ,
it fails during logical plan generation from rewritten query, as column names will be ambigous

Solution:
update the outputlist when there are duplicate columns present in query. Here we can form the qualified name for the Attribute reference.
So when qualifier is defined for column, the qualified name wil be like <col_qualifier_name>_<col.name>,
if qualifier is not defined, then it will be <col_exprId_id>_<col.name>. So update for all the nodes like groupby , select nodes,
so that it will be handled when there will be amguity in columns.

This closes #3297
  • Loading branch information
akashrn5 authored and kumarvishal09 committed Jun 21, 2019
1 parent ed862ce commit 2b0e79c
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,9 @@ object MVHelper {
val relation =
s.dataMapTableRelation.get.asInstanceOf[MVPlanWrapper].plan.asInstanceOf[Select]
val outputList = getUpdatedOutputList(relation.outputList, s.dataMapTableRelation)
val mappings = s.outputList zip outputList
// when the output list contains multiple projection of same column, but relation
// contains distinct columns, mapping may go wrong with columns, so select distinct
val mappings = s.outputList.distinct zip outputList
val oList = for ((o1, o2) <- mappings) yield {
if (o1.name != o2.name) Alias(o2, o1.name)(exprId = o1.exprId) else o2
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,4 +310,45 @@ object MVUtil {
" are not allowed for this datamap")
}
}

def updateDuplicateColumns(outputList: Seq[NamedExpression]): Seq[NamedExpression] = {
val duplicateNameCols = outputList.groupBy(_.name).filter(_._2.length > 1).flatMap(_._2)
.toList
val updatedOutList = outputList.map { col =>
val duplicateColumn = duplicateNameCols
.find(a => a.semanticEquals(col))
val qualifiedName = col.qualifier.getOrElse(s"${ col.exprId.id }") + "_" + col.name
if (duplicateColumn.isDefined) {
val attributesOfDuplicateCol = duplicateColumn.get.collect {
case a: AttributeReference => a
}
val attributeOfCol = col.collect { case a: AttributeReference => a }
// here need to check the whether the duplicate columns is of same tables,
// since query with duplicate columns is valid, we need to make sure, not to change their
// names with above defined qualifier name, for example in case of some expression like
// cast((FLOOR((cast(col_name) as double))).., upper layer even exprid will be same,
// we need to find the attribute ref(col_name) at lower level and check where expid is same
// or of same tables, so doin the semantic equals
val isStrictDuplicate = attributesOfDuplicateCol.forall(expr =>
attributeOfCol.exists(a => a.semanticEquals(expr)))
if (!isStrictDuplicate) {
Alias(col, qualifiedName)(exprId = col.exprId)
} else if (col.qualifier.isDefined) {
Alias(col, qualifiedName)(exprId = col.exprId)
// this check is added in scenario where the column is direct Attribute reference and
// since duplicate columns select is allowed, we should just put alias for those columns
// and update, for this also above isStrictDuplicate will be true so, it will not be
// updated above
} else if (duplicateColumn.get.isInstanceOf[AttributeReference] &&
col.isInstanceOf[AttributeReference]) {
Alias(col, qualifiedName)(exprId = col.exprId)
} else {
col
}
} else {
col
}
}
updatedOutList
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,10 @@ object SelectSelectNoChildDelta extends DefaultMatchPattern with PredicateHelper
exprListR.exists(a1 => a1.isInstanceOf[Alias] &&
a1.asInstanceOf[Alias].child.semanticEquals(a.child)) ||
exprListR.exists(_.semanticEquals(exprE) || canEvaluate(exprE, subsumer))
case exp => exprListR.exists(_.semanticEquals(exp) || canEvaluate(exp, subsumer))
case exp =>
exprListR.exists(a1 => a1.isInstanceOf[Alias] &&
a1.asInstanceOf[Alias].child.semanticEquals(exp)) ||
exprListR.exists(_.semanticEquals(exprE) || canEvaluate(exprE, subsumer))
}
} else {
false
Expand Down Expand Up @@ -244,8 +247,7 @@ object SelectSelectNoChildDelta extends DefaultMatchPattern with PredicateHelper
val tChildren = new collection.mutable.ArrayBuffer[ModularPlan]()
val tAliasMap = new collection.mutable.HashMap[Int, String]()

val updatedOutList: Seq[NamedExpression] = updateDuplicateColumns(sel_1a)
val usel_1a = sel_1a.copy(outputList = updatedOutList)
val usel_1a = sel_1a.copy(outputList = sel_1a.outputList)
tChildren += usel_1a
tAliasMap += (tChildren.indexOf(usel_1a) -> rewrite.newSubsumerName())

Expand Down Expand Up @@ -350,18 +352,6 @@ object SelectSelectNoChildDelta extends DefaultMatchPattern with PredicateHelper
}
}

private def updateDuplicateColumns(sel_1a: Select) = {
val duplicateNameCols = sel_1a.outputList.groupBy(_.name).filter(_._2.length > 1).flatMap(_._2)
.toList
val updatedOutList = sel_1a.outputList.map { col =>
if (duplicateNameCols.contains(col)) {
Alias(col, col.qualifiedName)(exprId = col.exprId)
} else {
col
}
}
updatedOutList
}
}

object GroupbyGroupbyNoChildDelta extends DefaultMatchPattern {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.carbondata.mv.rewrite

import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}

import org.apache.carbondata.mv.datamap.MVUtil
import org.apache.carbondata.mv.expressions.modular._
import org.apache.carbondata.mv.plans.modular
import org.apache.carbondata.mv.plans.modular._
Expand Down Expand Up @@ -121,11 +122,16 @@ private[mv] class Navigator(catalog: SummaryDatasetCatalog, session: MVSession)
dataMapRelation: ModularPlan): ModularPlan = {
// Update datamap table relation to the subsumer modular plan
val updatedSubsumer = subsumer match {
// In case of order by it adds extra select but that can be ignored while doing selection.
case s@ Select(_, _, _, _, _, Seq(g: GroupBy), _, _, _, _) =>
s.copy(children = Seq(g.copy(dataMapTableRelation = Some(dataMapRelation))))
case s: Select => s.copy(dataMapTableRelation = Some(dataMapRelation))
case g: GroupBy => g.copy(dataMapTableRelation = Some(dataMapRelation))
// In case of order by it adds extra select but that can be ignored while doing selection.
case s@Select(_, _, _, _, _, Seq(g: GroupBy), _, _, _, _) =>
s.copy(children = Seq(g.copy(dataMapTableRelation = Some(dataMapRelation))),
outputList = MVUtil.updateDuplicateColumns(s.outputList))
case s: Select => s
.copy(dataMapTableRelation = Some(dataMapRelation),
outputList = MVUtil.updateDuplicateColumns(s.outputList))
case g: GroupBy => g
.copy(dataMapTableRelation = Some(dataMapRelation),
outputList = MVUtil.updateDuplicateColumns(g.outputList))
case other => other
}
(updatedSubsumer, subsumee) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,32 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
sql(s"drop datamap datamap29")
}

test("test create datamap with join with group by and projection with filter") {
sql("drop datamap if exists datamap29")
sql("create datamap datamap29 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization),sum(t2.empname) from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) group by t1.empname, t2.designation")
val frame = sql(
"select t1.empname ,t2.designation, sum(t1.utilization) from fact_table1 t1,fact_table2 t2 where " +
"t1.empname = t2.empname and t1.empname='shivani' group by t2.designation,t1.empname ")
val analyzed = frame.queryExecution.analyzed
assert(TestUtil.verifyMVDataMap(analyzed, "datamap29"))
checkAnswer(frame, sql("select t1.empname ,t2.designation, sum(t1.utilization) from fact_table4 t1,fact_table5 t2 where " +
"t1.empname = t2.empname and t1.empname='shivani' group by t2.designation,t1.empname "))
sql(s"drop datamap datamap29")
}

test("test create datamap with join with group by and sub projection with filter with alias") {
sql("drop datamap if exists datamap29")
sql("create datamap datamap29 using 'mv' as select t1.empname as a, t2.designation as b, sum(t1.utilization),sum(t2.empname) from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) group by t1.empname, t2.designation")
val frame = sql(
"select t1.empname ,t2.designation, sum(t1.utilization) from fact_table1 t1,fact_table2 t2 where " +
"t1.empname = t2.empname and t1.empname='shivani' group by t2.designation,t1.empname ")
val analyzed = frame.queryExecution.analyzed
assert(TestUtil.verifyMVDataMap(analyzed, "datamap29"))
checkAnswer(frame, sql("select t1.empname ,t2.designation, sum(t1.utilization) from fact_table4 t1,fact_table5 t2 where " +
"t1.empname = t2.empname and t1.empname='shivani' group by t2.designation,t1.empname "))
sql(s"drop datamap datamap29")
}

ignore("test create datamap with join with group by with filter") {
sql("drop datamap if exists datamap30")
sql("create datamap datamap30 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization),sum(t2.empname) from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) group by t1.empname, t2.designation")
Expand Down Expand Up @@ -1076,22 +1102,41 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
sql("drop table if exists maintable")
sql("create table maintable(name string, age int, add string) stored by 'carbondata'")
sql("create datamap dupli_mv using 'mv' as select name, sum(age),sum(age) from maintable group by name")
sql("create datamap dupli_projection using 'mv' as select age, age,add from maintable")
sql("create datamap constant_mv using 'mv' as select name, sum(1) ex1 from maintable group by name")
sql("insert into maintable select 'pheobe',31,'NY'")
val df1 = sql("select sum(age),name from maintable group by name")
val df2 = sql("select sum(age),sum(age),name from maintable group by name")
val df3 = sql("select name, sum(1) ex1 from maintable group by name")
val df4 = sql("select sum(1) ex1 from maintable group by name")
val df5 = sql("select age,age,add from maintable")
val df6 = sql("select age,add from maintable")
val analyzed1 = df1.queryExecution.analyzed
val analyzed2 = df2.queryExecution.analyzed
val analyzed3 = df3.queryExecution.analyzed
val analyzed4 = df4.queryExecution.analyzed
val analyzed5 = df5.queryExecution.analyzed
val analyzed6 = df6.queryExecution.analyzed
assert(TestUtil.verifyMVDataMap(analyzed1, "dupli_mv"))
assert(TestUtil.verifyMVDataMap(analyzed2, "dupli_mv"))
assert(TestUtil.verifyMVDataMap(analyzed3, "constant_mv"))
assert(TestUtil.verifyMVDataMap(analyzed4, "constant_mv"))
assert(TestUtil.verifyMVDataMap(analyzed5, "dupli_projection"))
assert(TestUtil.verifyMVDataMap(analyzed6, "dupli_projection"))
}

test("test mv query when the column names and table name same in join scenario") {
sql("drop table IF EXISTS price")
sql("drop table IF EXISTS quality")
sql("create table price(product string,price int) stored by 'carbondata'")
sql("create table quality(product string,quality string) stored by 'carbondata'")
sql("create datamap same_mv using 'mv' as select price.product,price.price,quality.product,quality.quality from price,quality where price.product = quality.product")
val df1 = sql("select price.product from price,quality where price.product = quality.product")
val analyzed1 = df1.queryExecution.analyzed
assert(TestUtil.verifyMVDataMap(analyzed1, "same_mv"))
}


def drop(): Unit = {
sql("drop table IF EXISTS fact_table1")
sql("drop table IF EXISTS fact_table2")
Expand Down

0 comments on commit 2b0e79c

Please sign in to comment.