Skip to content

Commit

Permalink
[CARBONDATA-3433]Fix MV issues related to duplicate columns, limit an…
Browse files Browse the repository at this point in the history
…d constant columns

Problem:
MV has below issues:
when has duplicate columns in select query, MV creation fails, but select is valid query
when used constant column in ctas for datamap creation, it fails
when limit is used in ctas for datamap creation, it fails

Solution:
since duplicate columns in query is valid, MV should support, so when creating columns, better take distinct columns
handle getting field relation map when we have constant column in query
block MV creation for limit ctas query, as it is not a valid case to use MV datamap.

This closes #3285
  • Loading branch information
akashrn5 authored and kunal642 committed Jun 18, 2019
1 parent 49b77d0 commit 581591a
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 17 deletions.
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, Coalesce, Expression, NamedExpression, ScalaUDF, SortOrder}
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Join, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Join, Limit, LogicalPlan, Project}
import org.apache.spark.sql.execution.command.{Field, PartitionerField, TableModel, TableNewProcessor}
import org.apache.spark.sql.execution.command.table.{CarbonCreateTableCommand, CarbonDropTableCommand}
import org.apache.spark.sql.execution.datasources.LogicalRelation
Expand Down Expand Up @@ -65,13 +65,22 @@ object MVHelper {
val updatedQuery = new CarbonSpark2SqlParser().addPreAggFunction(queryString)
val query = sparkSession.sql(updatedQuery)
val logicalPlan = MVHelper.dropDummFuc(query.queryExecution.analyzed)
// if there is limit in MV ctas query string, throw exception, as its not a valid usecase
logicalPlan match {
case Limit(_, _) =>
throw new MalformedCarbonCommandException("MV datamap does not support the query with " +
"limit")
case _ =>
}
val selectTables = getTables(logicalPlan)
if (selectTables.isEmpty) {
throw new MalformedCarbonCommandException(
s"Non-Carbon table does not support creating MV datamap")
}
val updatedQueryWithDb = validateMVQuery(sparkSession, logicalPlan)
val fullRebuild = isFullReload(logicalPlan)
// the ctas query can have duplicate columns, so we should take distinct and create fields,
// so that it won't fail during create mv table
val fields = logicalPlan.output.map { attr =>
if (attr.dataType.isInstanceOf[ArrayType] || attr.dataType.isInstanceOf[StructType] ||
attr.dataType.isInstanceOf[MapType]) {
Expand All @@ -96,7 +105,8 @@ object MVHelper {
children = None,
rawSchema = rawSchema)
}
}
}.distinct

val tableProperties = mutable.Map[String, String]()
val parentTables = new util.ArrayList[String]()
val parentTablesList = new util.ArrayList[CarbonTable](selectTables.size)
Expand Down Expand Up @@ -403,7 +413,9 @@ object MVHelper {

def getAttributeMap(subsumer: Seq[NamedExpression],
subsume: Seq[NamedExpression]): Map[AttributeKey, NamedExpression] = {
if (subsumer.length == subsume.length) {
// when datamap is created with duplicate columns like select sum(age),sum(age) from table,
// the subsumee will have duplicate, so handle that case here
if (subsumer.length == subsume.groupBy(_.name).size) {
subsume.zip(subsumer).flatMap { case (left, right) =>
var tuples = left collect {
case attr: AttributeReference =>
Expand Down
Expand Up @@ -129,15 +129,20 @@ object MVUtil {
val arrayBuffer: ArrayBuffer[ColumnTableRelation] = new ArrayBuffer[ColumnTableRelation]()
agg.collect {
case Alias(attr: AggregateExpression, name) =>
if (attr.aggregateFunction.isInstanceOf[Count]) {
var isLiteralPresent = false
attr.aggregateFunction.collect {
case l@Literal(_, _) =>
isLiteralPresent = true
}
if (isLiteralPresent) {
fieldToDataMapFieldMap +=
getFieldToDataMapFields(name,
attr.aggregateFunction.dataType,
None,
attr.aggregateFunction.nodeName,
arrayBuffer,
"")
aggregateType = "count"
aggregateType = attr.aggregateFunction.nodeName
} else {
aggregateType = attr.aggregateFunction.nodeName
}
Expand Down
Expand Up @@ -19,8 +19,6 @@ package org.apache.carbondata.mv.rewrite
import java.io.File

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll

Expand Down Expand Up @@ -992,7 +990,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {

var frame = sql(querySQL)
var analyzed = frame.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "all_table_mv"))
assert(TestUtil.verifyMVDataMap(analyzed, "all_table_mv"))
assert(2 == frame.collect().size)
frame.collect().foreach { each =>
if (1 == each.get(0)) {
Expand All @@ -1008,7 +1006,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {

frame = sql(querySQL2)
analyzed = frame.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "all_table_mv"))
assert(TestUtil.verifyMVDataMap(analyzed, "all_table_mv"))
assert(1 == frame.collect().size)
frame.collect().foreach { each =>
if (2 == each.get(0)) {
Expand All @@ -1034,11 +1032,11 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
val df1 = sql(
"select name,address from mv_like where Country NOT LIKE 'US' group by name,address")
val analyzed1 = df1.queryExecution.analyzed
assert(verifyMVDataMap(analyzed1, "mvlikedm1"))
assert(TestUtil.verifyMVDataMap(analyzed1, "mvlikedm1"))
val df2 = sql(
"select name,address,Country from mv_like where Country = 'US' or Country = 'China' group by name,address,Country")
val analyzed2 = df2.queryExecution.analyzed
assert(verifyMVDataMap(analyzed2, "mvlikedm2"))
assert(TestUtil.verifyMVDataMap(analyzed2, "mvlikedm2"))
}

test("test distinct, count, sum on MV with single projection column") {
Expand Down Expand Up @@ -1074,11 +1072,24 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
sql("drop table if exists mvtable1")
}

def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean = {
val tables = logicalPlan collect {
case l: LogicalRelation => l.catalogTable.get
}
tables.exists(_.identifier.table.equalsIgnoreCase(dataMapName + "_table"))
test("test mv with duplicate columns in query and constant column") {
sql("drop table if exists maintable")
sql("create table maintable(name string, age int, add string) stored by 'carbondata'")
sql("create datamap dupli_mv using 'mv' as select name, sum(age),sum(age) from maintable group by name")
sql("create datamap constant_mv using 'mv' as select name, sum(1) ex1 from maintable group by name")
sql("insert into maintable select 'pheobe',31,'NY'")
val df1 = sql("select sum(age),name from maintable group by name")
val df2 = sql("select sum(age),sum(age),name from maintable group by name")
val df3 = sql("select name, sum(1) ex1 from maintable group by name")
val df4 = sql("select sum(1) ex1 from maintable group by name")
val analyzed1 = df1.queryExecution.analyzed
val analyzed2 = df2.queryExecution.analyzed
val analyzed3 = df3.queryExecution.analyzed
val analyzed4 = df4.queryExecution.analyzed
assert(TestUtil.verifyMVDataMap(analyzed1, "dupli_mv"))
assert(TestUtil.verifyMVDataMap(analyzed2, "dupli_mv"))
assert(TestUtil.verifyMVDataMap(analyzed3, "constant_mv"))
assert(TestUtil.verifyMVDataMap(analyzed4, "constant_mv"))
}

def drop(): Unit = {
Expand Down
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.carbondata.mv.rewrite

import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException}
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
Expand All @@ -42,6 +42,13 @@ class MVExceptionTestCase extends QueryTest with BeforeAndAfterAll {
assertResult("DataMap with name main_table_mv1 already exists in storage")(ex.getMessage)
}

test("test mv creation with limit in query") {
val ex = intercept[MalformedCarbonCommandException] {
sql("create datamap maintable_mv2 on table main_table using 'mv' as select sum(age),name from main_table group by name limit 10")
}
assertResult("MV datamap does not support the query with limit")(ex.getMessage)
}

def drop(): Unit = {
sql("drop table IF EXISTS main_table")
sql("drop table if exists main_table_error")
Expand Down

0 comments on commit 581591a

Please sign in to comment.