Skip to content

Commit

Permalink
Fix block complex data type and validate dmproperties
Browse files Browse the repository at this point in the history
  • Loading branch information
Indhumathi27 committed May 29, 2019
1 parent fcca6c5 commit 07e4a56
Show file tree
Hide file tree
Showing 17 changed files with 282 additions and 237 deletions.
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.sql.execution.command.{Field, PartitionerField, TableMod
import org.apache.spark.sql.execution.command.table.{CarbonCreateTableCommand, CarbonDropTableCommand}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.parser.CarbonSpark2SqlParser
import org.apache.spark.sql.types.{ArrayType, MapType, StructType}
import org.apache.spark.util.{DataMapUtil, PartitionUtils}

import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
Expand Down Expand Up @@ -60,6 +61,7 @@ object MVHelper {
s"MV datamap does not support streaming"
)
}
MVUtil.validateDMProperty(dmProperties)
val updatedQuery = new CarbonSpark2SqlParser().addPreAggFunction(queryString)
val query = sparkSession.sql(updatedQuery)
val logicalPlan = MVHelper.dropDummFuc(query.queryExecution.analyzed)
Expand All @@ -71,6 +73,11 @@ object MVHelper {
val updatedQueryWithDb = validateMVQuery(sparkSession, logicalPlan)
val fullRebuild = isFullReload(logicalPlan)
val fields = logicalPlan.output.map { attr =>
if (attr.dataType.isInstanceOf[ArrayType] || attr.dataType.isInstanceOf[StructType] ||
attr.dataType.isInstanceOf[MapType]) {
throw new UnsupportedOperationException(
s"MV datamap is unsupported for ComplexData type column: " + attr.name)
}
val name = updateColumnName(attr)
val rawSchema = '`' + name + '`' + ' ' + attr.dataType.typeName
if (attr.dataType.typeName.startsWith("decimal")) {
Expand Down Expand Up @@ -312,13 +319,19 @@ object MVHelper {
modularPlan.asCompactSQL
}

def getUpdatedName(name: String): String = {
val updatedName = name.replace("(", "_")
.replace(")", "")
.replace(" ", "_")
.replace("=", "")
.replace(",", "")
.replace(".", "_")
.replace("`", "")
updatedName
}

def updateColumnName(attr: Attribute): String = {
val name =
attr.name.replace("(", "_")
.replace(")", "")
.replace(" ", "_")
.replace("=", "")
.replace(",", "")
val name = getUpdatedName(attr.name)
attr.qualifier.map(qualifier => qualifier + "_" + name).getOrElse(name)
}

Expand Down
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.execution.command.{ColumnTableRelation, DataMapField
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.types.DataType

import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.spark.util.CommonUtil

Expand Down Expand Up @@ -108,6 +109,8 @@ object MVUtil {
fieldToDataMapFieldMap +=
getFieldToDataMapFields(name, attr.dataType, None, "", arrayBuffer, "")
}
case a@Alias(_, name) =>
checkIfComplexDataTypeExists(a)
}
fieldToDataMapFieldMap
}
Expand Down Expand Up @@ -138,7 +141,8 @@ object MVUtil {
} else {
aggregateType = attr.aggregateFunction.nodeName
}
case Alias(_, name) =>
case a@Alias(_, name) =>
checkIfComplexDataTypeExists(a)
// In case of arithmetic expressions like sum(a)+sum(b)
aggregateType = "arithmetic"
}
Expand Down Expand Up @@ -251,12 +255,7 @@ object MVUtil {
aggregateType: String,
columnTableRelationList: ArrayBuffer[ColumnTableRelation],
parenTableName: String) = {
var actualColumnName =
name.replace("(", "_")
.replace(")", "")
.replace(" ", "_")
.replace("=", "")
.replace(",", "")
var actualColumnName = MVHelper.getUpdatedName(name)
if (qualifier.isDefined) {
actualColumnName = qualifier.map(qualifier => qualifier + "_" + name)
.getOrElse(actualColumnName)
Expand Down Expand Up @@ -285,4 +284,25 @@ object MVUtil {
rawSchema = rawSchema), dataMapField)
}
}

private def checkIfComplexDataTypeExists(a: Alias): Unit = {
if (a.child.isInstanceOf[GetMapValue] || a.child.isInstanceOf[GetStructField] ||
a.child.isInstanceOf[GetArrayItem]) {
throw new UnsupportedOperationException(
s"MV datamap is unsupported for ComplexData type child column: " + a.child.simpleString)
}
}

def validateDMProperty(tableProperty: mutable.Map[String, String]): Unit = {
val tableProperties = Array("dictionary_include", "dictionary_exclude", "sort_columns",
"local_dictionary_include", "local_dictionary_exclude", "long_string_columns",
"no_inverted_index", "inverted_index", "column_meta_cache", "range_column")
val unsupportedProps = tableProperty
.filter(f => tableProperties.exists(prop => prop.equalsIgnoreCase(f._1)))
if (unsupportedProps.nonEmpty) {
throw new MalformedDataMapCommandException(
"DMProperties " + unsupportedProps.keySet.mkString(",") +
" are not allowed for this datamap")
}
}
}
Expand Up @@ -43,7 +43,7 @@ class MVCoalesceTestCase extends QueryTest with BeforeAndAfterAll {
sql("rebuild datamap coalesce_test_main_mv")

val frame = sql("select coalesce(sum(id),0) as sumid,name from coalesce_test_main group by name")
assert(verifyMVDataMap(frame.queryExecution.analyzed, "coalesce_test_main_mv"))
assert(TestUtil.verifyMVDataMap(frame.queryExecution.analyzed, "coalesce_test_main_mv"))
checkAnswer(frame, Seq(Row(3, "tom"), Row(3, "lily")))

sql("drop datamap if exists coalesce_test_main_mv")
Expand All @@ -59,7 +59,7 @@ class MVCoalesceTestCase extends QueryTest with BeforeAndAfterAll {
assert("MV doesn't support Coalesce".equals(exception.getMessage))

val frame = sql("select coalesce(sum(id),0) as sumid,name from coalesce_test_main group by name")
assert(!verifyMVDataMap(frame.queryExecution.analyzed, "coalesce_test_main_mv"))
assert(!TestUtil.verifyMVDataMap(frame.queryExecution.analyzed, "coalesce_test_main_mv"))
checkAnswer(frame, Seq(Row(3, "tom"), Row(3, "lily")))

sql("drop datamap if exists coalesce_test_main_mv")
Expand All @@ -72,20 +72,22 @@ class MVCoalesceTestCase extends QueryTest with BeforeAndAfterAll {
sql("rebuild datamap coalesce_test_main_mv")

val frame = sql("select sum(coalesce(id,0)) as sumid,name from coalesce_test_main group by name")
assert(verifyMVDataMap(frame.queryExecution.analyzed, "coalesce_test_main_mv"))
assert(TestUtil.verifyMVDataMap(frame.queryExecution.analyzed, "coalesce_test_main_mv"))
checkAnswer(frame, Seq(Row(3, "tom"), Row(3, "lily")))

sql("drop datamap if exists coalesce_test_main_mv")
}

override def afterAll(): Unit ={
drop
}
}

object TestUtil {
def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean = {
val tables = logicalPlan collect {
case l: LogicalRelation => l.catalogTable.get
}
tables.exists(_.identifier.table.equalsIgnoreCase(dataMapName+"_table"))
}

override def afterAll(): Unit ={
drop
}
}
Expand Up @@ -79,14 +79,7 @@ class MVCountAndCaseTestCase extends QueryTest with BeforeAndAfterAll{
| GROUP BY MT.`3600`, MT.`2250410101`
| ORDER BY `3600` ASC LIMIT 5000""".stripMargin)

assert(verifyMVDataMap(frame.queryExecution.analyzed, "data_table_mv"))
}

def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean = {
val tables = logicalPlan collect {
case l: LogicalRelation => l.catalogTable.get
}
tables.exists(_.identifier.table.equalsIgnoreCase(dataMapName+"_table"))
assert(TestUtil.verifyMVDataMap(frame.queryExecution.analyzed, "data_table_mv"))
}

override def afterAll(): Unit = {
Expand Down

0 comments on commit 07e4a56

Please sign in to comment.