From d7808e7f6b83d21eee4e99cfdbd54d29c6d79fc1 Mon Sep 17 00:00:00 2001 From: Indhumathi27 Date: Tue, 18 Jun 2019 20:31:45 +0530 Subject: [PATCH] [CARBONDATA-3442]Fix creating mv datamap with column name having length more than 128 --- .../core/constants/CarbonCommonConstants.java | 5 +++++ .../carbondata/mv/datamap/MVHelper.scala | 20 ++++++++++++------- .../apache/carbondata/mv/datamap/MVUtil.scala | 11 ++++++---- .../mv/rewrite/MVCreateTestCase.scala | 17 ++++++++++++++++ 4 files changed, 42 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index 6833c8a8c0a..2f000d869a2 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -2235,4 +2235,9 @@ private CarbonCommonConstants() { * index server temp file name */ public static final String INDEX_SERVER_TEMP_FOLDER_NAME = "indexservertmp"; + + /** + * hive column-name maximum length + */ + public static final int MAXIMUM_CHAR_LENGTH = 128; } diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala index 4d4308837ff..b3e55934ef4 100644 --- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala +++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala @@ -61,7 +61,8 @@ object MVHelper { s"MV datamap does not support streaming" ) } - MVUtil.validateDMProperty(dmProperties) + val mvUtil = new MVUtil + mvUtil.validateDMProperty(dmProperties) val updatedQuery = new CarbonSpark2SqlParser().addPreAggFunction(queryString) val query = sparkSession.sql(updatedQuery) val logicalPlan = MVHelper.dropDummFuc(query.queryExecution.analyzed) @@ -79,6 +80,7 @@ object MVHelper { } val updatedQueryWithDb = validateMVQuery(sparkSession, logicalPlan) val fullRebuild = isFullReload(logicalPlan) + var counter = 0 // the ctas query can have duplicate columns, so we should take distinct and create fields, // so that it won't fail during create mv table val fields = logicalPlan.output.map { attr => @@ -87,7 +89,8 @@ object MVHelper { throw new UnsupportedOperationException( s"MV datamap is unsupported for ComplexData type column: " + attr.name) } - val name = updateColumnName(attr) + val name = updateColumnName(attr, counter) + counter += 1 val rawSchema = '`' + name + '`' + ' ' + attr.dataType.typeName if (attr.dataType.typeName.startsWith("decimal")) { val (precision, scale) = CommonUtil.getScaleAndPrecision(attr.dataType.catalogString) @@ -137,7 +140,7 @@ object MVHelper { tableProperties.put(CarbonCommonConstants.DATAMAP_NAME, dataMapSchema.getDataMapName) tableProperties.put(CarbonCommonConstants.PARENT_TABLES, parentTables.asScala.mkString(",")) - val fieldRelationMap = MVUtil.getFieldsAndDataMapFieldsFromPlan( + val fieldRelationMap = mvUtil.getFieldsAndDataMapFieldsFromPlan( logicalPlan, queryString, sparkSession) // If dataMap is mapped to single main table, then inherit table properties from main table, // else, will use default table properties. If DMProperties contains table properties, then @@ -329,19 +332,22 @@ object MVHelper { modularPlan.asCompactSQL } - def getUpdatedName(name: String): String = { - val updatedName = name.replace("(", "_") + def getUpdatedName(name: String, counter: Int): String = { + var updatedName = name.replace("(", "_") .replace(")", "") .replace(" ", "_") .replace("=", "") .replace(",", "") .replace(".", "_") .replace("`", "") + if (updatedName.length >= CarbonCommonConstants.MAXIMUM_CHAR_LENGTH) { + updatedName = updatedName.substring(0, 110) + CarbonCommonConstants.UNDERSCORE + counter + } updatedName } - def updateColumnName(attr: Attribute): String = { - val name = getUpdatedName(attr.name) + def updateColumnName(attr: Attribute, counter: Int): String = { + val name = getUpdatedName(attr.name, counter) attr.qualifier.map(qualifier => qualifier + "_" + name).getOrElse(name) } diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala index 8cb2f1fc57e..f4bb33f90f9 100644 --- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala +++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala @@ -35,7 +35,9 @@ import org.apache.carbondata.spark.util.CommonUtil /** * Utility class for keeping all the utility method for mv datamap */ -object MVUtil { +class MVUtil { + + var counter = 0 /** * Below method will be used to validate and get the required fields from select plan @@ -127,9 +129,9 @@ object MVUtil { aggExp.map { agg => var aggregateType: String = "" val arrayBuffer: ArrayBuffer[ColumnTableRelation] = new ArrayBuffer[ColumnTableRelation]() + var isLiteralPresent = false agg.collect { case Alias(attr: AggregateExpression, name) => - var isLiteralPresent = false attr.aggregateFunction.collect { case l@Literal(_, _) => isLiteralPresent = true @@ -175,7 +177,7 @@ object MVUtil { } } } - if (!aggregateType.isEmpty && arrayBuffer.nonEmpty) { + if (!aggregateType.isEmpty && arrayBuffer.nonEmpty && !isLiteralPresent) { fieldToDataMapFieldMap += getFieldToDataMapFields(agg.name, agg.dataType, @@ -260,7 +262,8 @@ object MVUtil { aggregateType: String, columnTableRelationList: ArrayBuffer[ColumnTableRelation], parenTableName: String) = { - var actualColumnName = MVHelper.getUpdatedName(name) + var actualColumnName = MVHelper.getUpdatedName(name, counter) + counter += 1 if (qualifier.isDefined) { actualColumnName = qualifier.map(qualifier => qualifier + "_" + name) .getOrElse(actualColumnName) diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala index d136b274369..c9a2206f91d 100644 --- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala +++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala @@ -1092,6 +1092,23 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll { assert(TestUtil.verifyMVDataMap(analyzed4, "constant_mv")) } + test("test datamap column having more than 128 characters") { + sql("drop table IF EXISTS maintable") + sql("create table maintable (m_month smallint, c_code string, " + + "c_country smallint, d_dollar_value double, q_quantity double, u_unit smallint, b_country smallint, i_id int, y_year smallint) stored by 'carbondata'") + sql("insert into maintable select 10, 'xxx', 123, 456, 45, 5, 23, 1, 2000") + sql("drop datamap if exists da_agg") + sql("create datamap da_agg using 'mv' as select u_unit, y_year, m_month, c_country, b_country, sum(case when i_id=1 and (y_year=2000 and m_month=10)" + + "then d_dollar_value else 0 end), sum(case when i_id=1 and (y_year=2000 and m_month=10) then q_quantity else 0 end) ex, sum(case when i_id=1 and (y_year=2011 and " + + "(m_month>=7 and m_month <=12)) then q_quantity else 0 end) from maintable group by u_unit, y_year, m_month, c_country, b_country") + val df = sql("select u_unit, y_year, m_month, c_country, b_country, sum(case when i_id=1 and (y_year=2000 and m_month=10) then d_dollar_value else 0 end), " + + "sum(case when i_id=1 and (y_year=2000 and m_month=10) then q_quantity else 0 end) ex, sum(case when i_id=1 and (y_year=2011 and (m_month>=7 and m_month " + + "<=12)) then q_quantity else 0 end) from maintable group by u_unit,y_year, m_month, c_country, b_country") + val analyzed = df.queryExecution.analyzed + assert(TestUtil.verifyMVDataMap(analyzed, "da_agg")) + sql("drop table IF EXISTS maintable") + } + def drop(): Unit = { sql("drop table IF EXISTS fact_table1") sql("drop table IF EXISTS fact_table2")